172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
clint.textui.progress
|
|
~~~~~~~~~~~~~~~~~
|
|
|
|
This module provides the progressbar functionality.
|
|
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import sys
|
|
import time
|
|
|
|
STREAM = sys.stderr
|
|
|
|
BAR_TEMPLATE = '%s[%s%s] %i/%i - %s\r'
|
|
MILL_TEMPLATE = '%s %s %i/%i\r'
|
|
|
|
DOTS_CHAR = '.'
|
|
BAR_FILLED_CHAR = '#'
|
|
BAR_EMPTY_CHAR = ' '
|
|
MILL_CHARS = ['|', '/', '-', '\\']
|
|
|
|
# How long to wait before recalculating the ETA
|
|
ETA_INTERVAL = 1
|
|
# How many intervals (excluding the current one) to calculate the simple moving
|
|
# average
|
|
ETA_SMA_WINDOW = 9
|
|
|
|
|
|
class Bar(object):
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.done()
|
|
return False # we're not suppressing exceptions
|
|
|
|
def __init__(self, label='', width=32, hide=None, empty_char=BAR_EMPTY_CHAR,
|
|
filled_char=BAR_FILLED_CHAR, expected_size=None, every=1):
|
|
self.label = label
|
|
self.width = width
|
|
self.hide = hide
|
|
# Only show bar in terminals by default (better for piping, logging etc.)
|
|
if hide is None:
|
|
try:
|
|
self.hide = not STREAM.isatty()
|
|
except AttributeError: # output does not support isatty()
|
|
self.hide = True
|
|
self.empty_char = empty_char
|
|
self.filled_char = filled_char
|
|
self.expected_size = expected_size
|
|
self.every = every
|
|
self.start = time.time()
|
|
self.ittimes = []
|
|
self.eta = 0
|
|
self.etadelta = time.time()
|
|
self.etadisp = self.format_time(self.eta)
|
|
self.last_progress = 0
|
|
if (self.expected_size):
|
|
self.show(0)
|
|
|
|
def show(self, progress, count=None):
|
|
if count is not None:
|
|
self.expected_size = count
|
|
if self.expected_size is None:
|
|
raise Exception("expected_size not initialized")
|
|
self.last_progress = progress
|
|
if (time.time() - self.etadelta) > ETA_INTERVAL:
|
|
self.etadelta = time.time()
|
|
self.ittimes = \
|
|
self.ittimes[-ETA_SMA_WINDOW:] + \
|
|
[-(self.start - time.time()) / (progress+1)]
|
|
self.eta = \
|
|
sum(self.ittimes) / float(len(self.ittimes)) * \
|
|
(self.expected_size - progress)
|
|
self.etadisp = self.format_time(self.eta)
|
|
x = int(self.width * progress / self.expected_size)
|
|
if not self.hide:
|
|
if ((progress % self.every) == 0 or # True every "every" updates
|
|
(progress == self.expected_size)): # And when we're done
|
|
STREAM.write(BAR_TEMPLATE % (
|
|
self.label, self.filled_char * x,
|
|
self.empty_char * (self.width - x), progress,
|
|
self.expected_size, self.etadisp))
|
|
STREAM.flush()
|
|
|
|
def done(self):
|
|
self.elapsed = time.time() - self.start
|
|
elapsed_disp = self.format_time(self.elapsed)
|
|
if not self.hide:
|
|
# Print completed bar with elapsed time
|
|
STREAM.write(BAR_TEMPLATE % (
|
|
self.label, self.filled_char * self.width,
|
|
self.empty_char * 0, self.last_progress,
|
|
self.expected_size, elapsed_disp))
|
|
STREAM.write('\n')
|
|
STREAM.flush()
|
|
|
|
def format_time(self, seconds):
|
|
return time.strftime('%H:%M:%S', time.gmtime(seconds))
|
|
|
|
|
|
def bar(it, label='', width=32, hide=None, empty_char=BAR_EMPTY_CHAR,
|
|
filled_char=BAR_FILLED_CHAR, expected_size=None, every=1):
|
|
"""Progress iterator. Wrap your iterables with it."""
|
|
|
|
count = len(it) if expected_size is None else expected_size
|
|
|
|
with Bar(label=label, width=width, hide=hide, empty_char=BAR_EMPTY_CHAR,
|
|
filled_char=BAR_FILLED_CHAR, expected_size=count, every=every) \
|
|
as bar:
|
|
for i, item in enumerate(it):
|
|
yield item
|
|
bar.show(i + 1)
|
|
|
|
|
|
def dots(it, label='', hide=None, every=1):
|
|
"""Progress iterator. Prints a dot for each item being iterated"""
|
|
|
|
count = 0
|
|
|
|
if not hide:
|
|
STREAM.write(label)
|
|
|
|
for i, item in enumerate(it):
|
|
if not hide:
|
|
if i % every == 0: # True every "every" updates
|
|
STREAM.write(DOTS_CHAR)
|
|
sys.stderr.flush()
|
|
|
|
count += 1
|
|
|
|
yield item
|
|
|
|
STREAM.write('\n')
|
|
STREAM.flush()
|
|
|
|
|
|
def mill(it, label='', hide=None, expected_size=None, every=1):
|
|
"""Progress iterator. Prints a mill while iterating over the items."""
|
|
|
|
def _mill_char(_i):
|
|
if _i >= count:
|
|
return ' '
|
|
else:
|
|
return MILL_CHARS[(_i // every) % len(MILL_CHARS)]
|
|
|
|
def _show(_i):
|
|
if not hide:
|
|
if ((_i % every) == 0 or # True every "every" updates
|
|
(_i == count)): # And when we're done
|
|
|
|
STREAM.write(MILL_TEMPLATE % (
|
|
label, _mill_char(_i), _i, count))
|
|
STREAM.flush()
|
|
|
|
count = len(it) if expected_size is None else expected_size
|
|
|
|
if count:
|
|
_show(0)
|
|
|
|
for i, item in enumerate(it):
|
|
yield item
|
|
_show(i + 1)
|
|
|
|
if not hide:
|
|
STREAM.write('\n')
|
|
STREAM.flush()
|