import io
import os
import sys
import math
import logging
import datetime
from . import six
# There might be a better way to get the epoch with tzinfo, please create
# a pull request if you know a better way that functions for Python 2 and 3
epoch = datetime.datetime(year=1970, month=1, day=1)
[docs]class WrappingIO:
def __init__(self, target, capturing=False):
self.buffer = six.StringIO()
self.target = target
self.capturing = capturing
[docs] def write(self, value):
if self.capturing:
self.buffer.write(value)
else:
self.target.write(value)
[docs] def flush(self):
self.buffer.flush()
def _flush(self):
value = self.buffer.getvalue()
if value:
self.flush()
self.target.write(value)
self.buffer.seek(0)
self.buffer.truncate(0)
[docs]class StreamWrapper(object):
'''Wrap stdout and stderr globally'''
def __init__(self):
self.stdout = self.original_stdout = sys.stdout
self.stderr = self.original_stderr = sys.stderr
self.original_excepthook = sys.excepthook
self.wrapped_stdout = 0
self.wrapped_stderr = 0
self.wrapped_excepthook = 0
self.capturing = 0
if os.environ.get('WRAP_STDOUT'): # pragma: no cover
self.wrap_stdout()
if os.environ.get('WRAP_STDERR'): # pragma: no cover
self.wrap_stderr()
[docs] def start_capturing(self):
self.capturing += 1
self.update_capturing()
[docs] def stop_capturing(self):
self.capturing -= 1
self.update_capturing()
[docs] def update_capturing(self): # pragma: no cover
if isinstance(self.stdout, WrappingIO):
self.stdout.capturing = self.capturing > 0
if isinstance(self.stderr, WrappingIO):
self.stderr.capturing = self.capturing > 0
if self.capturing <= 0:
self.flush()
[docs] def wrap(self, stdout=False, stderr=False):
if stdout:
self.wrap_stdout()
if stderr:
self.wrap_stderr()
[docs] def wrap_stdout(self):
self.wrap_excepthook()
if not self.wrapped_stdout:
self.stdout = sys.stdout = WrappingIO(self.original_stdout)
self.wrapped_stdout += 1
return sys.stdout
[docs] def wrap_stderr(self):
self.wrap_excepthook()
if not self.wrapped_stderr:
self.stderr = sys.stderr = WrappingIO(self.original_stderr)
self.wrapped_stderr += 1
return sys.stderr
[docs] def unwrap_excepthook(self):
if self.wrapped_excepthook:
self.wrapped_excepthook -= 1
sys.excepthook = self.original_excepthook
[docs] def wrap_excepthook(self):
if not self.wrapped_excepthook:
logger.debug('wrapping excepthook')
self.wrapped_excepthook += 1
sys.excepthook = self.excepthook
[docs] def unwrap(self, stdout=False, stderr=False):
if stdout:
self.unwrap_stdout()
if stderr:
self.unwrap_stderr()
[docs] def unwrap_stdout(self):
if self.wrapped_stdout > 1:
self.wrapped_stdout -= 1
else:
sys.stdout = self.original_stdout
self.wrapped_stdout = 0
[docs] def unwrap_stderr(self):
if self.wrapped_stderr > 1:
self.wrapped_stderr -= 1
else:
sys.stderr = self.original_stderr
self.wrapped_stderr = 0
[docs] def flush(self):
if self.wrapped_stdout: # pragma: no branch
try:
self.stdout._flush()
except (io.UnsupportedOperation,
AttributeError): # pragma: no cover
self.wrapped_stdout = False
logger.warn('Disabling stdout redirection, %r is not seekable',
sys.stdout)
if self.wrapped_stderr: # pragma: no branch
try:
self.stderr._flush()
except (io.UnsupportedOperation,
AttributeError): # pragma: no cover
self.wrapped_stderr = False
logger.warn('Disabling stderr redirection, %r is not seekable',
sys.stderr)
[docs] def excepthook(self, exc_type, exc_value, exc_traceback):
self.original_excepthook(exc_type, exc_value, exc_traceback)
self.flush()
streams = StreamWrapper()
logger = logging.getLogger(__name__)
[docs]def timedelta_to_seconds(delta):
'''Convert a timedelta to seconds with the microseconds as fraction
>>> from datetime import timedelta
>>> '%d' % timedelta_to_seconds(timedelta(days=1))
'86400'
>>> '%d' % timedelta_to_seconds(timedelta(seconds=1))
'1'
>>> '%.6f' % timedelta_to_seconds(timedelta(seconds=1, microseconds=1))
'1.000001'
>>> '%.6f' % timedelta_to_seconds(timedelta(microseconds=1))
'0.000001'
'''
# Only convert to float if needed
if delta.microseconds:
total = delta.microseconds * 1e-6
else:
total = 0
total += delta.seconds
total += delta.days * 60 * 60 * 24
return total
[docs]def scale_1024(x, n_prefixes):
'''Scale a number down to a suitable size, based on powers of 1024.
Returns the scaled number and the power of 1024 used.
Use to format numbers of bytes to KiB, MiB, etc.
>>> scale_1024(310, 3)
(310.0, 0)
>>> scale_1024(2048, 3)
(2.0, 1)
>>> scale_1024(0, 2)
(0.0, 0)
>>> scale_1024(0.5, 2)
(0.5, 0)
>>> scale_1024(1, 2)
(1.0, 0)
'''
if x <= 0:
power = 0
else:
power = min(int(math.log(x, 2) / 10), n_prefixes - 1)
scaled = float(x) / (2 ** (10 * power))
return scaled, power
[docs]def get_terminal_size(): # pragma: no cover
'''Get the current size of your terminal
Multiple returns are not always a good idea, but in this case it greatly
simplifies the code so I believe it's justified. It's not the prettiest
function but that's never really possible with cross-platform code.
Returns:
width, height: Two integers containing width and height
'''
try:
# Default to 79 characters for IPython notebooks
from IPython import get_ipython
ipython = get_ipython()
from ipykernel import zmqshell
if isinstance(ipython, zmqshell.ZMQInteractiveShell):
return 79, 24
except Exception: # pragma: no cover
pass
try:
# This works for Python 3, but not Pypy3. Probably the best method if
# it's supported so let's always try
import shutil
w, h = shutil.get_terminal_size()
if w and h:
# The off by one is needed due to progressbars in some cases, for
# safety we'll always substract it.
return w - 1, h
except Exception: # pragma: no cover
pass
try:
w = int(os.environ.get('COLUMNS'))
h = int(os.environ.get('LINES'))
if w and h:
return w, h
except Exception: # pragma: no cover
pass
try:
import blessings
terminal = blessings.Terminal()
w = terminal.width
h = terminal.height
if w and h:
return w, h
except Exception: # pragma: no cover
pass
try:
w, h = _get_terminal_size_linux()
if w and h:
return w, h
except Exception: # pragma: no cover
pass
try:
# Windows detection doesn't always work, let's try anyhow
w, h = _get_terminal_size_windows()
if w and h:
return w, h
except Exception: # pragma: no cover
pass
try:
# needed for window's python in cygwin's xterm!
w, h = _get_terminal_size_tput()
if w and h:
return w, h
except Exception: # pragma: no cover
pass
return 79, 24
def _get_terminal_size_windows(): # pragma: no cover
res = None
try:
from ctypes import windll, create_string_buffer
# stdin handle is -10
# stdout handle is -11
# stderr handle is -12
h = windll.kernel32.GetStdHandle(-12)
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
except Exception:
return None
if res:
import struct
(_, _, _, _, _, left, top, right, bottom, _, _) = \
struct.unpack("hhhhHhhhhhh", csbi.raw)
w = right - left
h = bottom - top
return w, h
else:
return None
def _get_terminal_size_tput(): # pragma: no cover
# get terminal width src: http://stackoverflow.com/questions/263890/
try:
import subprocess
proc = subprocess.Popen(
['tput', 'cols'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = proc.communicate(input=None)
w = int(output[0])
proc = subprocess.Popen(
['tput', 'lines'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = proc.communicate(input=None)
h = int(output[0])
return w, h
except Exception:
return None
def _get_terminal_size_linux(): # pragma: no cover
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
size = struct.unpack(
'hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
except Exception:
return None
return size
size = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not size:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
size = ioctl_GWINSZ(fd)
os.close(fd)
except Exception:
pass
if not size:
try:
size = os.environ['LINES'], os.environ['COLUMNS']
except Exception:
return None
return int(size[1]), int(size[0])
[docs]def timestamp(dt): # pragma: no cover
if hasattr(dt, 'timestamp'):
return dt.timestamp()
else:
return (dt - epoch).total_seconds()