diff options
Diffstat (limited to 'vendor/github.com')
13 files changed, 298 insertions, 303 deletions
diff --git a/vendor/github.com/vbauerster/mpb/v7/bar.go b/vendor/github.com/vbauerster/mpb/v7/bar.go index 646cb471a..4991f4f15 100644 --- a/vendor/github.com/vbauerster/mpb/v7/bar.go +++ b/vendor/github.com/vbauerster/mpb/v7/bar.go @@ -17,31 +17,21 @@ import ( // Bar represents a progress bar. type Bar struct { - priority int // used by heap - index int // used by heap - - toShutdown bool - toDrop bool - noPop bool - hasEwmaDecorators bool - operateState chan func(*bState) - frameCh chan *frame - - // cancel is called either by user or on complete event - cancel func() - // done is closed after cacheState is assigned - done chan struct{} - // cacheState is populated, right after close(b.done) - cacheState *bState - + index int // used by heap + priority int // used by heap + hasEwma bool + frameCh chan *renderFrame + operateState chan func(*bState) + done chan struct{} container *Progress + bs *bState + cancel func() recoveredPanic interface{} } type extenderFunc func(in io.Reader, reqWidth int, st decor.Statistics) (out io.Reader, lines int) -// bState is actual bar state. It gets passed to *Bar.serve(...) monitor -// goroutine. +// bState is actual bar's state. type bState struct { id int priority int @@ -52,7 +42,6 @@ type bState struct { lastIncrement int64 trimSpace bool completed bool - completeFlushed bool aborted bool triggerComplete bool dropOnComplete bool @@ -66,29 +55,28 @@ type bState struct { filler BarFiller middleware func(BarFiller) BarFiller extender extenderFunc + debugOut io.Writer - // runningBar is a key for *pState.parkedBars - runningBar *Bar - - debugOut io.Writer + afterBar *Bar // key for (*pState).queueBars + sync bool } -type frame struct { - reader io.Reader - lines int +type renderFrame struct { + reader io.Reader + lines int + shutdown bool } func newBar(container *Progress, bs *bState) *Bar { ctx, cancel := context.WithCancel(container.ctx) bar := &Bar{ - container: container, priority: bs.priority, - toDrop: bs.dropOnComplete, - noPop: bs.noPop, + hasEwma: len(bs.ewmaDecorators) != 0, + frameCh: make(chan *renderFrame, 1), operateState: make(chan func(*bState)), - frameCh: make(chan *frame, 1), done: make(chan struct{}), + container: container, cancel: cancel, } @@ -97,12 +85,20 @@ func newBar(container *Progress, bs *bState) *Bar { } // ProxyReader wraps r with metrics required for progress tracking. -// Panics if r is nil. +// If r is 'unknown total/size' reader it's mandatory to call +// (*Bar).SetTotal(-1, true) method after (Reader).Read returns io.EOF. +// Panics if r is nil. If bar is already completed or aborted, returns +// nil. func (b *Bar) ProxyReader(r io.Reader) io.ReadCloser { if r == nil { panic("expected non nil io.Reader") } - return b.newProxyReader(r) + select { + case <-b.done: + return nil + default: + return b.newProxyReader(r) + } } // ID returs id of the bar. @@ -112,18 +108,18 @@ func (b *Bar) ID() int { case b.operateState <- func(s *bState) { result <- s.id }: return <-result case <-b.done: - return b.cacheState.id + return b.bs.id } } -// Current returns bar's current number, in other words sum of all increments. +// Current returns bar's current value, in other words sum of all increments. func (b *Bar) Current() int64 { result := make(chan int64) select { case b.operateState <- func(s *bState) { result <- s.current }: return <-result case <-b.done: - return b.cacheState.current + return b.bs.current } } @@ -142,7 +138,7 @@ func (b *Bar) SetRefill(amount int64) { // TraverseDecorators traverses all available decorators and calls cb func on each. func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) { - done := make(chan struct{}) + sync := make(chan struct{}) select { case b.operateState <- func(s *bState) { for _, decorators := range [...][]decor.Decorator{ @@ -153,28 +149,56 @@ func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) { cb(extractBaseDecorator(d)) } } - close(done) + close(sync) + }: + <-sync + case <-b.done: + } +} + +// EnableTriggerComplete enables triggering complete event. It's +// effective only for bar which was constructed with `total <= 0` and +// after total has been set with (*Bar).SetTotal(int64, false). If bar +// has been incremented to the total, complete event is triggered right +// away. +func (b *Bar) EnableTriggerComplete() { + select { + case b.operateState <- func(s *bState) { + if s.triggerComplete || s.total <= 0 { + return + } + if s.current >= s.total { + s.current = s.total + s.completed = true + go b.forceRefresh() + } else { + s.triggerComplete = true + } }: - <-done case <-b.done: } } -// SetTotal sets total dynamically. -// If total is negative it takes progress' current value. -func (b *Bar) SetTotal(total int64, triggerComplete bool) { +// SetTotal sets total to an arbitrary value. It's effective only for +// bar which was constructed with `total <= 0`. Setting total to negative +// value is equivalent to (*Bar).SetTotal((*Bar).Current(), bool). +// If triggerCompleteNow is true, total value is set to current and +// complete event is triggered right away. +func (b *Bar) SetTotal(total int64, triggerCompleteNow bool) { select { case b.operateState <- func(s *bState) { - s.triggerComplete = triggerComplete + if s.triggerComplete { + return + } if total < 0 { s.total = s.current } else { s.total = total } - if s.triggerComplete && !s.completed { + if triggerCompleteNow { s.current = s.total s.completed = true - go b.forceRefreshIfLastUncompleted() + go b.forceRefresh() } }: case <-b.done: @@ -191,7 +215,7 @@ func (b *Bar) SetCurrent(current int64) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - go b.forceRefreshIfLastUncompleted() + go b.forceRefresh() } }: case <-b.done: @@ -220,7 +244,7 @@ func (b *Bar) IncrInt64(n int64) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - go b.forceRefreshIfLastUncompleted() + go b.forceRefresh() } }: case <-b.done: @@ -242,9 +266,9 @@ func (b *Bar) DecoratorEwmaUpdate(dur time.Duration) { } }: case <-b.done: - if b.cacheState.lastIncrement > 0 { - b.cacheState.decoratorEwmaUpdate(dur) - b.cacheState.lastIncrement = 0 + if b.bs.lastIncrement > 0 { + b.bs.decoratorEwmaUpdate(dur) + b.bs.lastIncrement = 0 } } } @@ -270,44 +294,33 @@ func (b *Bar) SetPriority(priority int) { // Abort interrupts bar's running goroutine. Abort won't be engaged // if bar is already in complete state. If drop is true bar will be -// removed as well. +// removed as well. To make sure that bar has been removed call +// (*Bar).Wait method. func (b *Bar) Abort(drop bool) { - done := make(chan struct{}) select { case b.operateState <- func(s *bState) { - if s.completed { - close(done) + if s.completed || s.aborted { return } s.aborted = true - b.cancel() - // container must be run during lifetime of this inner goroutine - // we control this by done channel declared above - go func() { - if drop { - b.container.dropBar(b) - } else { - var uncompleted int - b.container.traverseBars(func(bar *Bar) bool { - if b != bar && !bar.Completed() { - uncompleted++ - return false - } - return true - }) - if uncompleted == 0 { - b.container.refreshCh <- time.Now() - } - } - close(done) // release hold of Abort - }() + s.dropOnComplete = drop + go b.forceRefresh() }: - // guarantee: container is alive during lifetime of this hold - <-done case <-b.done: } } +// Aborted reports whether the bar is in aborted state. +func (b *Bar) Aborted() bool { + result := make(chan bool) + select { + case b.operateState <- func(s *bState) { result <- s.aborted }: + return <-result + case <-b.done: + return b.bs.aborted + } +} + // Completed reports whether the bar is in completed state. func (b *Bar) Completed() bool { result := make(chan bool) @@ -315,19 +328,28 @@ func (b *Bar) Completed() bool { case b.operateState <- func(s *bState) { result <- s.completed }: return <-result case <-b.done: - return true + return b.bs.completed } } -func (b *Bar) serve(ctx context.Context, s *bState) { +// Wait blocks until bar is completed or aborted. +func (b *Bar) Wait() { + <-b.done +} + +func (b *Bar) serve(ctx context.Context, bs *bState) { defer b.container.bwg.Done() + if bs.afterBar != nil && bs.sync { + bs.afterBar.Wait() + } for { select { case op := <-b.operateState: - op(s) + op(bs) case <-ctx.Done(): - s.decoratorShutdownNotify() - b.cacheState = s + bs.aborted = !bs.completed + bs.decoratorShutdownNotify() + b.bs = bs close(b.done) return } @@ -337,79 +359,62 @@ func (b *Bar) serve(ctx context.Context, s *bState) { func (b *Bar) render(tw int) { select { case b.operateState <- func(s *bState) { + var reader io.Reader + var lines int stat := newStatistics(tw, s) defer func() { // recovering if user defined decorator panics for example if p := recover(); p != nil { - if b.recoveredPanic == nil { - if s.debugOut != nil { - fmt.Fprintln(s.debugOut, p) - _, _ = s.debugOut.Write(debug.Stack()) - } - s.extender = makePanicExtender(p) - b.toShutdown = !b.toShutdown - b.recoveredPanic = p + if s.debugOut != nil { + fmt.Fprintln(s.debugOut, p) + _, _ = s.debugOut.Write(debug.Stack()) } - reader, lines := s.extender(nil, s.reqWidth, stat) - b.frameCh <- &frame{reader, lines + 1} + s.aborted = !s.completed + s.extender = makePanicExtender(p) + reader, lines = s.extender(nil, s.reqWidth, stat) + b.recoveredPanic = p + } + frame := renderFrame{ + reader: reader, + lines: lines + 1, + shutdown: s.completed || s.aborted, } - s.completeFlushed = s.completed + if frame.shutdown { + b.cancel() + } + b.frameCh <- &frame }() - reader, lines := s.extender(s.draw(stat), s.reqWidth, stat) - b.toShutdown = s.completed && !s.completeFlushed - b.frameCh <- &frame{reader, lines + 1} + if b.recoveredPanic == nil { + reader = s.draw(stat) + } + reader, lines = s.extender(reader, s.reqWidth, stat) }: case <-b.done: - s := b.cacheState - stat := newStatistics(tw, s) - var r io.Reader + var reader io.Reader + var lines int + stat, s := newStatistics(tw, b.bs), b.bs if b.recoveredPanic == nil { - r = s.draw(stat) - } - reader, lines := s.extender(r, s.reqWidth, stat) - b.frameCh <- &frame{reader, lines + 1} - } -} - -func (b *Bar) subscribeDecorators() { - var averageDecorators []decor.AverageDecorator - var ewmaDecorators []decor.EwmaDecorator - var shutdownListeners []decor.ShutdownListener - b.TraverseDecorators(func(d decor.Decorator) { - if d, ok := d.(decor.AverageDecorator); ok { - averageDecorators = append(averageDecorators, d) - } - if d, ok := d.(decor.EwmaDecorator); ok { - ewmaDecorators = append(ewmaDecorators, d) + reader = s.draw(stat) } - if d, ok := d.(decor.ShutdownListener); ok { - shutdownListeners = append(shutdownListeners, d) + reader, lines = s.extender(reader, s.reqWidth, stat) + b.frameCh <- &renderFrame{ + reader: reader, + lines: lines + 1, } - }) - b.hasEwmaDecorators = len(ewmaDecorators) != 0 - select { - case b.operateState <- func(s *bState) { - s.averageDecorators = averageDecorators - s.ewmaDecorators = ewmaDecorators - s.shutdownListeners = shutdownListeners - }: - case <-b.done: } } -func (b *Bar) forceRefreshIfLastUncompleted() { - var uncompleted int +func (b *Bar) forceRefresh() { + var anyOtherRunning bool b.container.traverseBars(func(bar *Bar) bool { - if b != bar && !bar.Completed() { - uncompleted++ - return false - } - return true + anyOtherRunning = b != bar && bar.isRunning() + return !anyOtherRunning }) - if uncompleted == 0 { + if !anyOtherRunning { for { select { case b.container.refreshCh <- time.Now(): + time.Sleep(prr) case <-b.done: return } @@ -417,13 +422,25 @@ func (b *Bar) forceRefreshIfLastUncompleted() { } } +func (b *Bar) isRunning() bool { + result := make(chan bool) + select { + case b.operateState <- func(s *bState) { + result <- !s.completed && !s.aborted + }: + return <-result + case <-b.done: + return false + } +} + func (b *Bar) wSyncTable() [][]chan int { result := make(chan [][]chan int) select { case b.operateState <- func(s *bState) { result <- s.wSyncTable() }: return <-result case <-b.done: - return b.cacheState.wSyncTable() + return b.bs.wSyncTable() } } @@ -487,6 +504,26 @@ func (s *bState) wSyncTable() [][]chan int { return table } +func (s *bState) subscribeDecorators() { + for _, decorators := range [...][]decor.Decorator{ + s.pDecorators, + s.aDecorators, + } { + for _, d := range decorators { + d = extractBaseDecorator(d) + if d, ok := d.(decor.AverageDecorator); ok { + s.averageDecorators = append(s.averageDecorators, d) + } + if d, ok := d.(decor.EwmaDecorator); ok { + s.ewmaDecorators = append(s.ewmaDecorators, d) + } + if d, ok := d.(decor.ShutdownListener); ok { + s.shutdownListeners = append(s.shutdownListeners, d) + } + } + } +} + func (s bState) decoratorEwmaUpdate(dur time.Duration) { wg := new(sync.WaitGroup) for i := 0; i < len(s.ewmaDecorators); i++ { @@ -540,12 +577,12 @@ func (s bState) decoratorShutdownNotify() { func newStatistics(tw int, s *bState) decor.Statistics { return decor.Statistics{ - ID: s.id, AvailableWidth: tw, + ID: s.id, Total: s.total, Current: s.current, Refill: s.refill, - Completed: s.completeFlushed, + Completed: s.completed, Aborted: s.aborted, } } diff --git a/vendor/github.com/vbauerster/mpb/v7/bar_filler_bar.go b/vendor/github.com/vbauerster/mpb/v7/bar_filler_bar.go index 54b7bfd6f..d8bf90a4a 100644 --- a/vendor/github.com/vbauerster/mpb/v7/bar_filler_bar.go +++ b/vendor/github.com/vbauerster/mpb/v7/bar_filler_bar.go @@ -157,9 +157,8 @@ func (s *bFiller) Fill(w io.Writer, width int, stat decor.Statistics) { return } - ow := optimisticWriter(w) - ow(s.components[iLbound].bytes) - defer ow(s.components[iRbound].bytes) + mustWrite(w, s.components[iLbound].bytes) + defer mustWrite(w, s.components[iRbound].bytes) if width == 0 { return @@ -231,26 +230,24 @@ func (s *bFiller) Fill(w io.Writer, width int, stat decor.Statistics) { } if s.rev { - flush(ow, padding, filling) + flush(w, padding, filling) } else { - flush(ow, filling, padding) + flush(w, filling, padding) } } -func flush(ow func([]byte), filling, padding [][]byte) { +func flush(w io.Writer, filling, padding [][]byte) { for i := len(filling) - 1; i >= 0; i-- { - ow(filling[i]) + mustWrite(w, filling[i]) } for i := 0; i < len(padding); i++ { - ow(padding[i]) + mustWrite(w, padding[i]) } } -func optimisticWriter(w io.Writer) func([]byte) { - return func(p []byte) { - _, err := w.Write(p) - if err != nil { - panic(err) - } +func mustWrite(w io.Writer, p []byte) { + _, err := w.Write(p) + if err != nil { + panic(err) } } diff --git a/vendor/github.com/vbauerster/mpb/v7/bar_option.go b/vendor/github.com/vbauerster/mpb/v7/bar_option.go index 4ba490505..8599f0a57 100644 --- a/vendor/github.com/vbauerster/mpb/v7/bar_option.go +++ b/vendor/github.com/vbauerster/mpb/v7/bar_option.go @@ -59,14 +59,17 @@ func BarWidth(width int) BarOption { } } -// BarQueueAfter queues this (being constructed) bar to relplace -// runningBar after it has been completed. -func BarQueueAfter(runningBar *Bar) BarOption { - if runningBar == nil { +// BarQueueAfter puts this (being constructed) bar into the queue. +// When argument bar completes or aborts queued bar replaces its place. +// If sync is true queued bar is suspended until argument bar completes +// or aborts. +func BarQueueAfter(bar *Bar, sync bool) BarOption { + if bar == nil { return nil } return func(s *bState) { - s.runningBar = runningBar + s.afterBar = bar + s.sync = sync } } diff --git a/vendor/github.com/vbauerster/mpb/v7/cwriter/writer.go b/vendor/github.com/vbauerster/mpb/v7/cwriter/writer.go index eaf541cb7..fac15b3bc 100644 --- a/vendor/github.com/vbauerster/mpb/v7/cwriter/writer.go +++ b/vendor/github.com/vbauerster/mpb/v7/cwriter/writer.go @@ -11,7 +11,7 @@ import ( // ErrNotTTY not a TeleTYpewriter error. var ErrNotTTY = errors.New("not a terminal") -// http://ascii-table.com/ansi-escape-sequences.php +// https://github.com/dylanaraps/pure-sh-bible#cursor-movement const ( escOpen = "\x1b[" cuuAndEd = "A\x1b[J" diff --git a/vendor/github.com/vbauerster/mpb/v7/decor/decorator.go b/vendor/github.com/vbauerster/mpb/v7/decor/decorator.go index 9fec57b15..aad7709c0 100644 --- a/vendor/github.com/vbauerster/mpb/v7/decor/decorator.go +++ b/vendor/github.com/vbauerster/mpb/v7/decor/decorator.go @@ -47,8 +47,8 @@ const ( // Statistics consists of progress related statistics, that Decorator // may need. type Statistics struct { - ID int AvailableWidth int + ID int Total int64 Current int64 Refill int64 diff --git a/vendor/github.com/vbauerster/mpb/v7/decor/optimistic_string_writer.go b/vendor/github.com/vbauerster/mpb/v7/decor/optimistic_string_writer.go index ea9fda79d..c6a34384e 100644 --- a/vendor/github.com/vbauerster/mpb/v7/decor/optimistic_string_writer.go +++ b/vendor/github.com/vbauerster/mpb/v7/decor/optimistic_string_writer.go @@ -2,11 +2,9 @@ package decor import "io" -func optimisticStringWriter(w io.Writer) func(string) { - return func(s string) { - _, err := io.WriteString(w, s) - if err != nil { - panic(err) - } +func mustWriteString(w io.Writer, s string) { + _, err := io.WriteString(w, s) + if err != nil { + panic(err) } } diff --git a/vendor/github.com/vbauerster/mpb/v7/decor/percentage.go b/vendor/github.com/vbauerster/mpb/v7/decor/percentage.go index 6e7f5c6ed..e72668993 100644 --- a/vendor/github.com/vbauerster/mpb/v7/decor/percentage.go +++ b/vendor/github.com/vbauerster/mpb/v7/decor/percentage.go @@ -23,12 +23,11 @@ func (s percentageType) Format(st fmt.State, verb rune) { } } - osw := optimisticStringWriter(st) - osw(strconv.FormatFloat(float64(s), 'f', prec, 64)) + mustWriteString(st, strconv.FormatFloat(float64(s), 'f', prec, 64)) if st.Flag(' ') { - osw(" ") + mustWriteString(st, " ") } - osw("%") + mustWriteString(st, "%") } // Percentage returns percentage decorator. It's a wrapper of NewPercentage. diff --git a/vendor/github.com/vbauerster/mpb/v7/decor/size_type.go b/vendor/github.com/vbauerster/mpb/v7/decor/size_type.go index 12879b8f1..09ecc23f8 100644 --- a/vendor/github.com/vbauerster/mpb/v7/decor/size_type.go +++ b/vendor/github.com/vbauerster/mpb/v7/decor/size_type.go @@ -49,12 +49,11 @@ func (self SizeB1024) Format(st fmt.State, verb rune) { unit = _iTiB } - osw := optimisticStringWriter(st) - osw(strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64)) + mustWriteString(st, strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64)) if st.Flag(' ') { - osw(" ") + mustWriteString(st, " ") } - osw(unit.String()) + mustWriteString(st, unit.String()) } const ( @@ -98,10 +97,9 @@ func (self SizeB1000) Format(st fmt.State, verb rune) { unit = _TB } - osw := optimisticStringWriter(st) - osw(strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64)) + mustWriteString(st, strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64)) if st.Flag(' ') { - osw(" ") + mustWriteString(st, " ") } - osw(unit.String()) + mustWriteString(st, unit.String()) } diff --git a/vendor/github.com/vbauerster/mpb/v7/decor/speed.go b/vendor/github.com/vbauerster/mpb/v7/decor/speed.go index 99cfde2bf..f052352fc 100644 --- a/vendor/github.com/vbauerster/mpb/v7/decor/speed.go +++ b/vendor/github.com/vbauerster/mpb/v7/decor/speed.go @@ -23,7 +23,7 @@ type speedFormatter struct { func (self *speedFormatter) Format(st fmt.State, verb rune) { self.Formatter.Format(st, verb) - optimisticStringWriter(st)("/s") + mustWriteString(st, "/s") } // EwmaSpeed exponential-weighted-moving-average based speed decorator. diff --git a/vendor/github.com/vbauerster/mpb/v7/go.mod b/vendor/github.com/vbauerster/mpb/v7/go.mod index 8fa790dc7..db1457e35 100644 --- a/vendor/github.com/vbauerster/mpb/v7/go.mod +++ b/vendor/github.com/vbauerster/mpb/v7/go.mod @@ -4,7 +4,7 @@ require ( github.com/VividCortex/ewma v1.2.0 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d github.com/mattn/go-runewidth v0.0.13 - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 + golang.org/x/sys v0.0.0-20220209214540-3681064d5158 ) go 1.14 diff --git a/vendor/github.com/vbauerster/mpb/v7/go.sum b/vendor/github.com/vbauerster/mpb/v7/go.sum index aebe4d9d2..f36888be9 100644 --- a/vendor/github.com/vbauerster/mpb/v7/go.sum +++ b/vendor/github.com/vbauerster/mpb/v7/go.sum @@ -6,5 +6,5 @@ github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4 github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= -golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/vendor/github.com/vbauerster/mpb/v7/progress.go b/vendor/github.com/vbauerster/mpb/v7/progress.go index 123af17cf..1d9a53e5c 100644 --- a/vendor/github.com/vbauerster/mpb/v7/progress.go +++ b/vendor/github.com/vbauerster/mpb/v7/progress.go @@ -16,12 +16,10 @@ import ( ) const ( - // default RefreshRate - prr = 150 * time.Millisecond + prr = 150 * time.Millisecond // default RefreshRate ) -// Progress represents a container that renders one or more progress -// bars. +// Progress represents a container that renders one or more progress bars. type Progress struct { ctx context.Context uwg *sync.WaitGroup @@ -33,14 +31,12 @@ type Progress struct { once sync.Once } -// pState holds bars in its priorityQueue. It gets passed to -// *Progress.serve(...) monitor goroutine. +// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine. type pState struct { - bHeap priorityQueue - heapUpdated bool - pMatrix map[int][]chan int - aMatrix map[int][]chan int - barShutdownQueue []*Bar + bHeap priorityQueue + heapUpdated bool + pMatrix map[int][]chan int + aMatrix map[int][]chan int // following are provided/overrided by user idCount int @@ -52,26 +48,26 @@ type pState struct { externalRefresh <-chan interface{} renderDelay <-chan struct{} shutdownNotifier chan struct{} - parkedBars map[*Bar]*Bar + queueBars map[*Bar]*Bar output io.Writer debugOut io.Writer } // New creates new Progress container instance. It's not possible to -// reuse instance after *Progress.Wait() method has been called. +// reuse instance after (*Progress).Wait method has been called. func New(options ...ContainerOption) *Progress { return NewWithContext(context.Background(), options...) } // NewWithContext creates new Progress container instance with provided -// context. It's not possible to reuse instance after *Progress.Wait() +// context. It's not possible to reuse instance after (*Progress).Wait // method has been called. func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { s := &pState{ - bHeap: priorityQueue{}, - rr: prr, - parkedBars: make(map[*Bar]*Bar), - output: os.Stdout, + bHeap: priorityQueue{}, + rr: prr, + queueBars: make(map[*Bar]*Bar), + output: os.Stdout, } for _, opt := range options { @@ -110,8 +106,8 @@ func (p *Progress) New(total int64, builder BarFillerBuilder, options ...BarOpti } // Add creates a bar which renders itself by provided filler. -// If `total <= 0` trigger complete event is disabled until reset with *bar.SetTotal(int64, bool). -// Panics if *Progress instance is done, i.e. called after *Progress.Wait(). +// If `total <= 0` triggering complete event by increment methods is disabled. +// Panics if *Progress instance is done, i.e. called after (*Progress).Wait(). func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar { if filler == nil { filler = NopStyle().Build() @@ -122,9 +118,8 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar case p.operateState <- func(ps *pState) { bs := ps.makeBarState(total, filler, options...) bar := newBar(p, bs) - if bs.runningBar != nil { - bs.runningBar.noPop = true - ps.parkedBars[bs.runningBar] = bar + if bs.afterBar != nil { + ps.queueBars[bs.afterBar] = bar } else { heap.Push(&ps.bHeap, bar) ps.heapUpdated = true @@ -133,7 +128,6 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar result <- bar }: bar := <-result - bar.subscribeDecorators() return bar case <-p.done: p.bwg.Done() @@ -141,21 +135,8 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar } } -func (p *Progress) dropBar(b *Bar) { - select { - case p.operateState <- func(s *pState) { - if b.index < 0 { - return - } - heap.Remove(&s.bHeap, b.index) - s.heapUpdated = true - }: - case <-p.done: - } -} - func (p *Progress) traverseBars(cb func(b *Bar) bool) { - done := make(chan struct{}) + sync := make(chan struct{}) select { case p.operateState <- func(s *pState) { for i := 0; i < s.bHeap.Len(); i++ { @@ -164,9 +145,9 @@ func (p *Progress) traverseBars(cb func(b *Bar) bool) { break } } - close(done) + close(sync) }: - <-done + <-sync case <-p.done: } } @@ -200,8 +181,8 @@ func (p *Progress) BarCount() int { // After this method has been called, there is no way to reuse *Progress // instance. func (p *Progress) Wait() { + // wait for user wg, if any if p.uwg != nil { - // wait for user wg p.uwg.Wait() } @@ -256,6 +237,64 @@ func (p *Progress) serve(s *pState, cw *cwriter.Writer) { } } +func (s *pState) render(cw *cwriter.Writer) error { + if s.heapUpdated { + s.updateSyncMatrix() + s.heapUpdated = false + } + syncWidth(s.pMatrix) + syncWidth(s.aMatrix) + + tw, err := cw.GetWidth() + if err != nil { + tw = s.reqWidth + } + for i := 0; i < s.bHeap.Len(); i++ { + bar := s.bHeap[i] + go bar.render(tw) + } + + return s.flush(cw) +} + +func (s *pState) flush(cw *cwriter.Writer) error { + var lines int + pool := make([]*Bar, 0, s.bHeap.Len()) + for s.bHeap.Len() > 0 { + b := heap.Pop(&s.bHeap).(*Bar) + frame := <-b.frameCh + lines += frame.lines + _, err := cw.ReadFrom(frame.reader) + if err != nil { + return err + } + if frame.shutdown { + b.Wait() // waiting for b.done, so it's safe to read b.bs + var toDrop bool + if qb, ok := s.queueBars[b]; ok { + delete(s.queueBars, b) + qb.priority = b.priority + pool = append(pool, qb) + toDrop = true + } else if s.popCompleted && !b.bs.noPop { + lines -= frame.lines + toDrop = true + } + if toDrop || b.bs.dropOnComplete { + s.heapUpdated = true + continue + } + } + pool = append(pool, b) + } + + for _, b := range pool { + heap.Push(&s.bHeap, b) + } + + return cw.Flush(lines) +} + func (s *pState) newTicker(done <-chan struct{}) chan time.Time { ch := make(chan time.Time) if s.shutdownNotifier == nil { @@ -294,78 +333,6 @@ func (s *pState) newTicker(done <-chan struct{}) chan time.Time { return ch } -func (s *pState) render(cw *cwriter.Writer) error { - if s.heapUpdated { - s.updateSyncMatrix() - s.heapUpdated = false - } - syncWidth(s.pMatrix) - syncWidth(s.aMatrix) - - tw, err := cw.GetWidth() - if err != nil { - tw = s.reqWidth - } - for i := 0; i < s.bHeap.Len(); i++ { - bar := s.bHeap[i] - go bar.render(tw) - } - - return s.flush(cw) -} - -func (s *pState) flush(cw *cwriter.Writer) error { - var totalLines int - bm := make(map[*Bar]int, s.bHeap.Len()) - for s.bHeap.Len() > 0 { - b := heap.Pop(&s.bHeap).(*Bar) - frame := <-b.frameCh - _, err := cw.ReadFrom(frame.reader) - if err != nil { - return err - } - if b.toShutdown { - if b.recoveredPanic != nil { - s.barShutdownQueue = append(s.barShutdownQueue, b) - b.toShutdown = false - } else { - // shutdown at next flush - // this ensures no bar ends up with less than 100% rendered - defer func() { - s.barShutdownQueue = append(s.barShutdownQueue, b) - }() - } - } - bm[b] = frame.lines - totalLines += frame.lines - } - - for _, b := range s.barShutdownQueue { - if parkedBar := s.parkedBars[b]; parkedBar != nil { - parkedBar.priority = b.priority - heap.Push(&s.bHeap, parkedBar) - delete(s.parkedBars, b) - b.toDrop = true - } - if s.popCompleted && !b.noPop { - totalLines -= bm[b] - b.toDrop = true - } - if b.toDrop { - delete(bm, b) - s.heapUpdated = true - } - b.cancel() - } - s.barShutdownQueue = s.barShutdownQueue[0:0] - - for b := range bm { - heap.Push(&s.bHeap, b) - } - - return cw.Flush(totalLines) -} - func (s *pState) updateSyncMatrix() { s.pMatrix = make(map[int][]chan int) s.aMatrix = make(map[int][]chan int) @@ -418,6 +385,8 @@ func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOptio bs.buffers[i] = bytes.NewBuffer(make([]byte, 0, 512)) } + bs.subscribeDecorators() + return bs } @@ -427,7 +396,7 @@ func syncWidth(matrix map[int][]chan int) { } } -var maxWidthDistributor = func(column []chan int) { +func maxWidthDistributor(column []chan int) { var maxWidth int for _, ch := range column { if w := <-ch; w > maxWidth { diff --git a/vendor/github.com/vbauerster/mpb/v7/proxyreader.go b/vendor/github.com/vbauerster/mpb/v7/proxyreader.go index 25f195bb8..b0dd89d45 100644 --- a/vendor/github.com/vbauerster/mpb/v7/proxyreader.go +++ b/vendor/github.com/vbauerster/mpb/v7/proxyreader.go @@ -14,9 +14,6 @@ type proxyReader struct { func (x proxyReader) Read(p []byte) (int, error) { n, err := x.ReadCloser.Read(p) x.bar.IncrBy(n) - if err == io.EOF { - go x.bar.SetTotal(-1, true) - } return n, err } @@ -28,9 +25,6 @@ type proxyWriterTo struct { func (x proxyWriterTo) WriteTo(w io.Writer) (int64, error) { n, err := x.wt.WriteTo(w) x.bar.IncrInt64(n) - if err == io.EOF { - go x.bar.SetTotal(-1, true) - } return n, err } @@ -65,12 +59,12 @@ func (b *Bar) newProxyReader(r io.Reader) (rc io.ReadCloser) { pr := proxyReader{toReadCloser(r), b} if wt, ok := r.(io.WriterTo); ok { pw := proxyWriterTo{pr, wt} - if b.hasEwmaDecorators { + if b.hasEwma { rc = ewmaProxyWriterTo{ewmaProxyReader{pr}, pw} } else { rc = pw } - } else if b.hasEwmaDecorators { + } else if b.hasEwma { rc = ewmaProxyReader{pr} } else { rc = pr |