diff options
Diffstat (limited to 'vendor/github.com/vbauerster/mpb/v7/bar.go')
-rw-r--r-- | vendor/github.com/vbauerster/mpb/v7/bar.go | 313 |
1 files changed, 175 insertions, 138 deletions
diff --git a/vendor/github.com/vbauerster/mpb/v7/bar.go b/vendor/github.com/vbauerster/mpb/v7/bar.go index 646cb471a..4991f4f15 100644 --- a/vendor/github.com/vbauerster/mpb/v7/bar.go +++ b/vendor/github.com/vbauerster/mpb/v7/bar.go @@ -17,31 +17,21 @@ import ( // Bar represents a progress bar. type Bar struct { - priority int // used by heap - index int // used by heap - - toShutdown bool - toDrop bool - noPop bool - hasEwmaDecorators bool - operateState chan func(*bState) - frameCh chan *frame - - // cancel is called either by user or on complete event - cancel func() - // done is closed after cacheState is assigned - done chan struct{} - // cacheState is populated, right after close(b.done) - cacheState *bState - + index int // used by heap + priority int // used by heap + hasEwma bool + frameCh chan *renderFrame + operateState chan func(*bState) + done chan struct{} container *Progress + bs *bState + cancel func() recoveredPanic interface{} } type extenderFunc func(in io.Reader, reqWidth int, st decor.Statistics) (out io.Reader, lines int) -// bState is actual bar state. It gets passed to *Bar.serve(...) monitor -// goroutine. +// bState is actual bar's state. type bState struct { id int priority int @@ -52,7 +42,6 @@ type bState struct { lastIncrement int64 trimSpace bool completed bool - completeFlushed bool aborted bool triggerComplete bool dropOnComplete bool @@ -66,29 +55,28 @@ type bState struct { filler BarFiller middleware func(BarFiller) BarFiller extender extenderFunc + debugOut io.Writer - // runningBar is a key for *pState.parkedBars - runningBar *Bar - - debugOut io.Writer + afterBar *Bar // key for (*pState).queueBars + sync bool } -type frame struct { - reader io.Reader - lines int +type renderFrame struct { + reader io.Reader + lines int + shutdown bool } func newBar(container *Progress, bs *bState) *Bar { ctx, cancel := context.WithCancel(container.ctx) bar := &Bar{ - container: container, priority: bs.priority, - toDrop: bs.dropOnComplete, - noPop: bs.noPop, + hasEwma: len(bs.ewmaDecorators) != 0, + frameCh: make(chan *renderFrame, 1), operateState: make(chan func(*bState)), - frameCh: make(chan *frame, 1), done: make(chan struct{}), + container: container, cancel: cancel, } @@ -97,12 +85,20 @@ func newBar(container *Progress, bs *bState) *Bar { } // ProxyReader wraps r with metrics required for progress tracking. -// Panics if r is nil. +// If r is 'unknown total/size' reader it's mandatory to call +// (*Bar).SetTotal(-1, true) method after (Reader).Read returns io.EOF. +// Panics if r is nil. If bar is already completed or aborted, returns +// nil. func (b *Bar) ProxyReader(r io.Reader) io.ReadCloser { if r == nil { panic("expected non nil io.Reader") } - return b.newProxyReader(r) + select { + case <-b.done: + return nil + default: + return b.newProxyReader(r) + } } // ID returs id of the bar. @@ -112,18 +108,18 @@ func (b *Bar) ID() int { case b.operateState <- func(s *bState) { result <- s.id }: return <-result case <-b.done: - return b.cacheState.id + return b.bs.id } } -// Current returns bar's current number, in other words sum of all increments. +// Current returns bar's current value, in other words sum of all increments. func (b *Bar) Current() int64 { result := make(chan int64) select { case b.operateState <- func(s *bState) { result <- s.current }: return <-result case <-b.done: - return b.cacheState.current + return b.bs.current } } @@ -142,7 +138,7 @@ func (b *Bar) SetRefill(amount int64) { // TraverseDecorators traverses all available decorators and calls cb func on each. func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) { - done := make(chan struct{}) + sync := make(chan struct{}) select { case b.operateState <- func(s *bState) { for _, decorators := range [...][]decor.Decorator{ @@ -153,28 +149,56 @@ func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) { cb(extractBaseDecorator(d)) } } - close(done) + close(sync) + }: + <-sync + case <-b.done: + } +} + +// EnableTriggerComplete enables triggering complete event. It's +// effective only for bar which was constructed with `total <= 0` and +// after total has been set with (*Bar).SetTotal(int64, false). If bar +// has been incremented to the total, complete event is triggered right +// away. +func (b *Bar) EnableTriggerComplete() { + select { + case b.operateState <- func(s *bState) { + if s.triggerComplete || s.total <= 0 { + return + } + if s.current >= s.total { + s.current = s.total + s.completed = true + go b.forceRefresh() + } else { + s.triggerComplete = true + } }: - <-done case <-b.done: } } -// SetTotal sets total dynamically. -// If total is negative it takes progress' current value. -func (b *Bar) SetTotal(total int64, triggerComplete bool) { +// SetTotal sets total to an arbitrary value. It's effective only for +// bar which was constructed with `total <= 0`. Setting total to negative +// value is equivalent to (*Bar).SetTotal((*Bar).Current(), bool). +// If triggerCompleteNow is true, total value is set to current and +// complete event is triggered right away. +func (b *Bar) SetTotal(total int64, triggerCompleteNow bool) { select { case b.operateState <- func(s *bState) { - s.triggerComplete = triggerComplete + if s.triggerComplete { + return + } if total < 0 { s.total = s.current } else { s.total = total } - if s.triggerComplete && !s.completed { + if triggerCompleteNow { s.current = s.total s.completed = true - go b.forceRefreshIfLastUncompleted() + go b.forceRefresh() } }: case <-b.done: @@ -191,7 +215,7 @@ func (b *Bar) SetCurrent(current int64) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - go b.forceRefreshIfLastUncompleted() + go b.forceRefresh() } }: case <-b.done: @@ -220,7 +244,7 @@ func (b *Bar) IncrInt64(n int64) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - go b.forceRefreshIfLastUncompleted() + go b.forceRefresh() } }: case <-b.done: @@ -242,9 +266,9 @@ func (b *Bar) DecoratorEwmaUpdate(dur time.Duration) { } }: case <-b.done: - if b.cacheState.lastIncrement > 0 { - b.cacheState.decoratorEwmaUpdate(dur) - b.cacheState.lastIncrement = 0 + if b.bs.lastIncrement > 0 { + b.bs.decoratorEwmaUpdate(dur) + b.bs.lastIncrement = 0 } } } @@ -270,44 +294,33 @@ func (b *Bar) SetPriority(priority int) { // Abort interrupts bar's running goroutine. Abort won't be engaged // if bar is already in complete state. If drop is true bar will be -// removed as well. +// removed as well. To make sure that bar has been removed call +// (*Bar).Wait method. func (b *Bar) Abort(drop bool) { - done := make(chan struct{}) select { case b.operateState <- func(s *bState) { - if s.completed { - close(done) + if s.completed || s.aborted { return } s.aborted = true - b.cancel() - // container must be run during lifetime of this inner goroutine - // we control this by done channel declared above - go func() { - if drop { - b.container.dropBar(b) - } else { - var uncompleted int - b.container.traverseBars(func(bar *Bar) bool { - if b != bar && !bar.Completed() { - uncompleted++ - return false - } - return true - }) - if uncompleted == 0 { - b.container.refreshCh <- time.Now() - } - } - close(done) // release hold of Abort - }() + s.dropOnComplete = drop + go b.forceRefresh() }: - // guarantee: container is alive during lifetime of this hold - <-done case <-b.done: } } +// Aborted reports whether the bar is in aborted state. +func (b *Bar) Aborted() bool { + result := make(chan bool) + select { + case b.operateState <- func(s *bState) { result <- s.aborted }: + return <-result + case <-b.done: + return b.bs.aborted + } +} + // Completed reports whether the bar is in completed state. func (b *Bar) Completed() bool { result := make(chan bool) @@ -315,19 +328,28 @@ func (b *Bar) Completed() bool { case b.operateState <- func(s *bState) { result <- s.completed }: return <-result case <-b.done: - return true + return b.bs.completed } } -func (b *Bar) serve(ctx context.Context, s *bState) { +// Wait blocks until bar is completed or aborted. +func (b *Bar) Wait() { + <-b.done +} + +func (b *Bar) serve(ctx context.Context, bs *bState) { defer b.container.bwg.Done() + if bs.afterBar != nil && bs.sync { + bs.afterBar.Wait() + } for { select { case op := <-b.operateState: - op(s) + op(bs) case <-ctx.Done(): - s.decoratorShutdownNotify() - b.cacheState = s + bs.aborted = !bs.completed + bs.decoratorShutdownNotify() + b.bs = bs close(b.done) return } @@ -337,79 +359,62 @@ func (b *Bar) serve(ctx context.Context, s *bState) { func (b *Bar) render(tw int) { select { case b.operateState <- func(s *bState) { + var reader io.Reader + var lines int stat := newStatistics(tw, s) defer func() { // recovering if user defined decorator panics for example if p := recover(); p != nil { - if b.recoveredPanic == nil { - if s.debugOut != nil { - fmt.Fprintln(s.debugOut, p) - _, _ = s.debugOut.Write(debug.Stack()) - } - s.extender = makePanicExtender(p) - b.toShutdown = !b.toShutdown - b.recoveredPanic = p + if s.debugOut != nil { + fmt.Fprintln(s.debugOut, p) + _, _ = s.debugOut.Write(debug.Stack()) } - reader, lines := s.extender(nil, s.reqWidth, stat) - b.frameCh <- &frame{reader, lines + 1} + s.aborted = !s.completed + s.extender = makePanicExtender(p) + reader, lines = s.extender(nil, s.reqWidth, stat) + b.recoveredPanic = p + } + frame := renderFrame{ + reader: reader, + lines: lines + 1, + shutdown: s.completed || s.aborted, } - s.completeFlushed = s.completed + if frame.shutdown { + b.cancel() + } + b.frameCh <- &frame }() - reader, lines := s.extender(s.draw(stat), s.reqWidth, stat) - b.toShutdown = s.completed && !s.completeFlushed - b.frameCh <- &frame{reader, lines + 1} + if b.recoveredPanic == nil { + reader = s.draw(stat) + } + reader, lines = s.extender(reader, s.reqWidth, stat) }: case <-b.done: - s := b.cacheState - stat := newStatistics(tw, s) - var r io.Reader + var reader io.Reader + var lines int + stat, s := newStatistics(tw, b.bs), b.bs if b.recoveredPanic == nil { - r = s.draw(stat) - } - reader, lines := s.extender(r, s.reqWidth, stat) - b.frameCh <- &frame{reader, lines + 1} - } -} - -func (b *Bar) subscribeDecorators() { - var averageDecorators []decor.AverageDecorator - var ewmaDecorators []decor.EwmaDecorator - var shutdownListeners []decor.ShutdownListener - b.TraverseDecorators(func(d decor.Decorator) { - if d, ok := d.(decor.AverageDecorator); ok { - averageDecorators = append(averageDecorators, d) - } - if d, ok := d.(decor.EwmaDecorator); ok { - ewmaDecorators = append(ewmaDecorators, d) + reader = s.draw(stat) } - if d, ok := d.(decor.ShutdownListener); ok { - shutdownListeners = append(shutdownListeners, d) + reader, lines = s.extender(reader, s.reqWidth, stat) + b.frameCh <- &renderFrame{ + reader: reader, + lines: lines + 1, } - }) - b.hasEwmaDecorators = len(ewmaDecorators) != 0 - select { - case b.operateState <- func(s *bState) { - s.averageDecorators = averageDecorators - s.ewmaDecorators = ewmaDecorators - s.shutdownListeners = shutdownListeners - }: - case <-b.done: } } -func (b *Bar) forceRefreshIfLastUncompleted() { - var uncompleted int +func (b *Bar) forceRefresh() { + var anyOtherRunning bool b.container.traverseBars(func(bar *Bar) bool { - if b != bar && !bar.Completed() { - uncompleted++ - return false - } - return true + anyOtherRunning = b != bar && bar.isRunning() + return !anyOtherRunning }) - if uncompleted == 0 { + if !anyOtherRunning { for { select { case b.container.refreshCh <- time.Now(): + time.Sleep(prr) case <-b.done: return } @@ -417,13 +422,25 @@ func (b *Bar) forceRefreshIfLastUncompleted() { } } +func (b *Bar) isRunning() bool { + result := make(chan bool) + select { + case b.operateState <- func(s *bState) { + result <- !s.completed && !s.aborted + }: + return <-result + case <-b.done: + return false + } +} + func (b *Bar) wSyncTable() [][]chan int { result := make(chan [][]chan int) select { case b.operateState <- func(s *bState) { result <- s.wSyncTable() }: return <-result case <-b.done: - return b.cacheState.wSyncTable() + return b.bs.wSyncTable() } } @@ -487,6 +504,26 @@ func (s *bState) wSyncTable() [][]chan int { return table } +func (s *bState) subscribeDecorators() { + for _, decorators := range [...][]decor.Decorator{ + s.pDecorators, + s.aDecorators, + } { + for _, d := range decorators { + d = extractBaseDecorator(d) + if d, ok := d.(decor.AverageDecorator); ok { + s.averageDecorators = append(s.averageDecorators, d) + } + if d, ok := d.(decor.EwmaDecorator); ok { + s.ewmaDecorators = append(s.ewmaDecorators, d) + } + if d, ok := d.(decor.ShutdownListener); ok { + s.shutdownListeners = append(s.shutdownListeners, d) + } + } + } +} + func (s bState) decoratorEwmaUpdate(dur time.Duration) { wg := new(sync.WaitGroup) for i := 0; i < len(s.ewmaDecorators); i++ { @@ -540,12 +577,12 @@ func (s bState) decoratorShutdownNotify() { func newStatistics(tw int, s *bState) decor.Statistics { return decor.Statistics{ - ID: s.id, AvailableWidth: tw, + ID: s.id, Total: s.total, Current: s.current, Refill: s.refill, - Completed: s.completeFlushed, + Completed: s.completed, Aborted: s.aborted, } } |