backing out Ronny's PR because it was merged too early (still has failing tests)
This commit is contained in:
@@ -1,55 +1,17 @@
|
||||
"""
|
||||
per-test stdout/stderr capturing mechanisms,
|
||||
``capsys`` and ``capfd`` function arguments.
|
||||
"""
|
||||
# note: py.io capture was where copied from
|
||||
# pylib 1.4.20.dev2 (rev 13d9af95547e)
|
||||
""" per-test stdout/stderr capturing mechanisms, ``capsys`` and ``capfd`` function arguments. """
|
||||
|
||||
import pytest, py
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import py
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from io import StringIO
|
||||
except ImportError:
|
||||
from StringIO import StringIO
|
||||
|
||||
try:
|
||||
from io import BytesIO
|
||||
except ImportError:
|
||||
class BytesIO(StringIO):
|
||||
def write(self, data):
|
||||
if isinstance(data, unicode):
|
||||
raise TypeError("not a byte value: %r" % (data,))
|
||||
StringIO.write(self, data)
|
||||
|
||||
if sys.version_info < (3, 0):
|
||||
class TextIO(StringIO):
|
||||
def write(self, data):
|
||||
if not isinstance(data, unicode):
|
||||
enc = getattr(self, '_encoding', 'UTF-8')
|
||||
data = unicode(data, enc, 'replace')
|
||||
StringIO.write(self, data)
|
||||
else:
|
||||
TextIO = StringIO
|
||||
|
||||
|
||||
patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
group = parser.getgroup("general")
|
||||
group._addoption(
|
||||
'--capture', action="store", default=None,
|
||||
group._addoption('--capture', action="store", default=None,
|
||||
metavar="method", choices=['fd', 'sys', 'no'],
|
||||
help="per-test capturing method: one of fd (default)|sys|no.")
|
||||
group._addoption(
|
||||
'-s', action="store_const", const="no", dest="capture",
|
||||
group._addoption('-s', action="store_const", const="no", dest="capture",
|
||||
help="shortcut for --capture=no.")
|
||||
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
||||
ns = parser.parse_known_args(args)
|
||||
@@ -60,16 +22,13 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
||||
method = "sys"
|
||||
capman = CaptureManager(method)
|
||||
early_config.pluginmanager.register(capman, "capturemanager")
|
||||
|
||||
# make sure that capturemanager is properly reset at final shutdown
|
||||
def teardown():
|
||||
try:
|
||||
capman.reset_capturings()
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
early_config.pluginmanager.add_shutdown(teardown)
|
||||
|
||||
# make sure logging does not raise exceptions at the end
|
||||
def silence_logging_at_shutdown():
|
||||
if "logging" in sys.modules:
|
||||
@@ -88,27 +47,21 @@ def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
|
||||
sys.stderr.write(err)
|
||||
raise
|
||||
|
||||
|
||||
def addouterr(rep, outerr):
|
||||
for secname, content in zip(["out", "err"], outerr):
|
||||
if content:
|
||||
rep.sections.append(("Captured std%s" % secname, content))
|
||||
|
||||
|
||||
class NoCapture:
|
||||
def startall(self):
|
||||
pass
|
||||
|
||||
def resume(self):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def suspend(self):
|
||||
return "", ""
|
||||
|
||||
|
||||
class CaptureManager:
|
||||
def __init__(self, defaultmethod=None):
|
||||
self._method2capture = {}
|
||||
@@ -116,25 +69,21 @@ class CaptureManager:
|
||||
|
||||
def _maketempfile(self):
|
||||
f = py.std.tempfile.TemporaryFile()
|
||||
newf = dupfile(f, encoding="UTF-8")
|
||||
newf = py.io.dupfile(f, encoding="UTF-8")
|
||||
f.close()
|
||||
return newf
|
||||
|
||||
def _makestringio(self):
|
||||
return TextIO()
|
||||
return py.io.TextIO()
|
||||
|
||||
def _getcapture(self, method):
|
||||
if method == "fd":
|
||||
return StdCaptureFD(
|
||||
now=False,
|
||||
out=self._maketempfile(),
|
||||
err=self._maketempfile(),
|
||||
return py.io.StdCaptureFD(now=False,
|
||||
out=self._maketempfile(), err=self._maketempfile()
|
||||
)
|
||||
elif method == "sys":
|
||||
return StdCapture(
|
||||
now=False,
|
||||
out=self._makestringio(),
|
||||
err=self._makestringio(),
|
||||
return py.io.StdCapture(now=False,
|
||||
out=self._makestringio(), err=self._makestringio()
|
||||
)
|
||||
elif method == "no":
|
||||
return NoCapture()
|
||||
@@ -149,24 +98,23 @@ class CaptureManager:
|
||||
method = config._conftest.rget("option_capture", path=fspath)
|
||||
except KeyError:
|
||||
method = "fd"
|
||||
if method == "fd" and not hasattr(os, 'dup'): # e.g. jython
|
||||
if method == "fd" and not hasattr(os, 'dup'): # e.g. jython
|
||||
method = "sys"
|
||||
return method
|
||||
|
||||
def reset_capturings(self):
|
||||
for cap in self._method2capture.values():
|
||||
for name, cap in self._method2capture.items():
|
||||
cap.reset()
|
||||
|
||||
def resumecapture_item(self, item):
|
||||
method = self._getmethod(item.config, item.fspath)
|
||||
if not hasattr(item, 'outerr'):
|
||||
item.outerr = ('', '') # we accumulate outerr on the item
|
||||
item.outerr = ('', '') # we accumulate outerr on the item
|
||||
return self.resumecapture(method)
|
||||
|
||||
def resumecapture(self, method=None):
|
||||
if hasattr(self, '_capturing'):
|
||||
raise ValueError(
|
||||
"cannot resume, already capturing with %r" %
|
||||
raise ValueError("cannot resume, already capturing with %r" %
|
||||
(self._capturing,))
|
||||
if method is None:
|
||||
method = self._defaultmethod
|
||||
@@ -215,9 +163,8 @@ class CaptureManager:
|
||||
try:
|
||||
self.resumecapture(method)
|
||||
except ValueError:
|
||||
# recursive collect, XXX refactor capturing
|
||||
# to allow for more lightweight recursive capturing
|
||||
return
|
||||
return # recursive collect, XXX refactor capturing
|
||||
# to allow for more lightweight recursive capturing
|
||||
try:
|
||||
rep = __multicall__.execute()
|
||||
finally:
|
||||
@@ -258,7 +205,6 @@ class CaptureManager:
|
||||
|
||||
error_capsysfderror = "cannot use capsys and capfd at the same time"
|
||||
|
||||
|
||||
def pytest_funcarg__capsys(request):
|
||||
"""enables capturing of writes to sys.stdout/sys.stderr and makes
|
||||
captured output available via ``capsys.readouterr()`` method calls
|
||||
@@ -266,8 +212,7 @@ def pytest_funcarg__capsys(request):
|
||||
"""
|
||||
if "capfd" in request._funcargs:
|
||||
raise request.raiseerror(error_capsysfderror)
|
||||
return CaptureFixture(StdCapture)
|
||||
|
||||
return CaptureFixture(py.io.StdCapture)
|
||||
|
||||
def pytest_funcarg__capfd(request):
|
||||
"""enables capturing of writes to file descriptors 1 and 2 and makes
|
||||
@@ -278,366 +223,26 @@ def pytest_funcarg__capfd(request):
|
||||
request.raiseerror(error_capsysfderror)
|
||||
if not hasattr(os, 'dup'):
|
||||
pytest.skip("capfd funcarg needs os.dup")
|
||||
return CaptureFixture(StdCaptureFD)
|
||||
|
||||
return CaptureFixture(py.io.StdCaptureFD)
|
||||
|
||||
class CaptureFixture:
|
||||
def __init__(self, captureclass):
|
||||
self._capture = captureclass(now=False)
|
||||
self.capture = captureclass(now=False)
|
||||
|
||||
def _start(self):
|
||||
self._capture.startall()
|
||||
self.capture.startall()
|
||||
|
||||
def _finalize(self):
|
||||
if hasattr(self, 'capture'):
|
||||
outerr = self._outerr = self._capture.reset()
|
||||
del self._capture
|
||||
outerr = self._outerr = self.capture.reset()
|
||||
del self.capture
|
||||
return outerr
|
||||
|
||||
def readouterr(self):
|
||||
try:
|
||||
return self._capture.readouterr()
|
||||
return self.capture.readouterr()
|
||||
except AttributeError:
|
||||
return self._outerr
|
||||
|
||||
def close(self):
|
||||
self._finalize()
|
||||
|
||||
|
||||
class FDCapture:
|
||||
""" Capture IO to/from a given os-level filedescriptor. """
|
||||
|
||||
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
|
||||
""" save targetfd descriptor, and open a new
|
||||
temporary file there. If no tmpfile is
|
||||
specified a tempfile.Tempfile() will be opened
|
||||
in text mode.
|
||||
"""
|
||||
self.targetfd = targetfd
|
||||
if tmpfile is None and targetfd != 0:
|
||||
f = tempfile.TemporaryFile('wb+')
|
||||
tmpfile = dupfile(f, encoding="UTF-8")
|
||||
f.close()
|
||||
self.tmpfile = tmpfile
|
||||
self._savefd = os.dup(self.targetfd)
|
||||
if patchsys:
|
||||
self._oldsys = getattr(sys, patchsysdict[targetfd])
|
||||
if now:
|
||||
self.start()
|
||||
|
||||
def start(self):
|
||||
try:
|
||||
os.fstat(self._savefd)
|
||||
except OSError:
|
||||
raise ValueError(
|
||||
"saved filedescriptor not valid, "
|
||||
"did you call start() twice?")
|
||||
if self.targetfd == 0 and not self.tmpfile:
|
||||
fd = os.open(os.devnull, os.O_RDONLY)
|
||||
os.dup2(fd, 0)
|
||||
os.close(fd)
|
||||
if hasattr(self, '_oldsys'):
|
||||
setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
|
||||
else:
|
||||
os.dup2(self.tmpfile.fileno(), self.targetfd)
|
||||
if hasattr(self, '_oldsys'):
|
||||
setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
|
||||
|
||||
def done(self):
|
||||
""" unpatch and clean up, returns the self.tmpfile (file object)
|
||||
"""
|
||||
os.dup2(self._savefd, self.targetfd)
|
||||
os.close(self._savefd)
|
||||
if self.targetfd != 0:
|
||||
self.tmpfile.seek(0)
|
||||
if hasattr(self, '_oldsys'):
|
||||
setattr(sys, patchsysdict[self.targetfd], self._oldsys)
|
||||
return self.tmpfile
|
||||
|
||||
def writeorg(self, data):
|
||||
""" write a string to the original file descriptor
|
||||
"""
|
||||
tempfp = tempfile.TemporaryFile()
|
||||
try:
|
||||
os.dup2(self._savefd, tempfp.fileno())
|
||||
tempfp.write(data)
|
||||
finally:
|
||||
tempfp.close()
|
||||
|
||||
|
||||
def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
|
||||
""" return a new open file object that's a duplicate of f
|
||||
|
||||
mode is duplicated if not given, 'buffering' controls
|
||||
buffer size (defaulting to no buffering) and 'raising'
|
||||
defines whether an exception is raised when an incompatible
|
||||
file object is passed in (if raising is False, the file
|
||||
object itself will be returned)
|
||||
"""
|
||||
try:
|
||||
fd = f.fileno()
|
||||
mode = mode or f.mode
|
||||
except AttributeError:
|
||||
if raising:
|
||||
raise
|
||||
return f
|
||||
newfd = os.dup(fd)
|
||||
if sys.version_info >= (3, 0):
|
||||
if encoding is not None:
|
||||
mode = mode.replace("b", "")
|
||||
buffering = True
|
||||
return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
|
||||
else:
|
||||
f = os.fdopen(newfd, mode, buffering)
|
||||
if encoding is not None:
|
||||
return EncodedFile(f, encoding)
|
||||
return f
|
||||
|
||||
|
||||
class EncodedFile(object):
|
||||
def __init__(self, _stream, encoding):
|
||||
self._stream = _stream
|
||||
self.encoding = encoding
|
||||
|
||||
def write(self, obj):
|
||||
if isinstance(obj, unicode):
|
||||
obj = obj.encode(self.encoding)
|
||||
self._stream.write(obj)
|
||||
|
||||
def writelines(self, linelist):
|
||||
data = ''.join(linelist)
|
||||
self.write(data)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._stream, name)
|
||||
|
||||
|
||||
class Capture(object):
|
||||
def call(cls, func, *args, **kwargs):
|
||||
""" return a (res, out, err) tuple where
|
||||
out and err represent the output/error output
|
||||
during function execution.
|
||||
call the given function with args/kwargs
|
||||
and capture output/error during its execution.
|
||||
"""
|
||||
so = cls()
|
||||
try:
|
||||
res = func(*args, **kwargs)
|
||||
finally:
|
||||
out, err = so.reset()
|
||||
return res, out, err
|
||||
call = classmethod(call)
|
||||
|
||||
def reset(self):
|
||||
""" reset sys.stdout/stderr and return captured output as strings. """
|
||||
if hasattr(self, '_reset'):
|
||||
raise ValueError("was already reset")
|
||||
self._reset = True
|
||||
outfile, errfile = self.done(save=False)
|
||||
out, err = "", ""
|
||||
if outfile and not outfile.closed:
|
||||
out = outfile.read()
|
||||
outfile.close()
|
||||
if errfile and errfile != outfile and not errfile.closed:
|
||||
err = errfile.read()
|
||||
errfile.close()
|
||||
return out, err
|
||||
|
||||
def suspend(self):
|
||||
""" return current snapshot captures, memorize tempfiles. """
|
||||
outerr = self.readouterr()
|
||||
outfile, errfile = self.done()
|
||||
return outerr
|
||||
|
||||
|
||||
class StdCaptureFD(Capture):
|
||||
""" This class allows to capture writes to FD1 and FD2
|
||||
and may connect a NULL file to FD0 (and prevent
|
||||
reads from sys.stdin). If any of the 0,1,2 file descriptors
|
||||
is invalid it will not be captured.
|
||||
"""
|
||||
def __init__(self, out=True, err=True, mixed=False,
|
||||
in_=True, patchsys=True, now=True):
|
||||
self._options = {
|
||||
"out": out,
|
||||
"err": err,
|
||||
"mixed": mixed,
|
||||
"in_": in_,
|
||||
"patchsys": patchsys,
|
||||
"now": now,
|
||||
}
|
||||
self._save()
|
||||
if now:
|
||||
self.startall()
|
||||
|
||||
def _save(self):
|
||||
in_ = self._options['in_']
|
||||
out = self._options['out']
|
||||
err = self._options['err']
|
||||
mixed = self._options['mixed']
|
||||
patchsys = self._options['patchsys']
|
||||
if in_:
|
||||
try:
|
||||
self.in_ = FDCapture(
|
||||
0, tmpfile=None, now=False,
|
||||
patchsys=patchsys)
|
||||
except OSError:
|
||||
pass
|
||||
if out:
|
||||
tmpfile = None
|
||||
if hasattr(out, 'write'):
|
||||
tmpfile = out
|
||||
try:
|
||||
self.out = FDCapture(
|
||||
1, tmpfile=tmpfile,
|
||||
now=False, patchsys=patchsys)
|
||||
self._options['out'] = self.out.tmpfile
|
||||
except OSError:
|
||||
pass
|
||||
if err:
|
||||
if out and mixed:
|
||||
tmpfile = self.out.tmpfile
|
||||
elif hasattr(err, 'write'):
|
||||
tmpfile = err
|
||||
else:
|
||||
tmpfile = None
|
||||
try:
|
||||
self.err = FDCapture(
|
||||
2, tmpfile=tmpfile,
|
||||
now=False, patchsys=patchsys)
|
||||
self._options['err'] = self.err.tmpfile
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def startall(self):
|
||||
if hasattr(self, 'in_'):
|
||||
self.in_.start()
|
||||
if hasattr(self, 'out'):
|
||||
self.out.start()
|
||||
if hasattr(self, 'err'):
|
||||
self.err.start()
|
||||
|
||||
def resume(self):
|
||||
""" resume capturing with original temp files. """
|
||||
self.startall()
|
||||
|
||||
def done(self, save=True):
|
||||
""" return (outfile, errfile) and stop capturing. """
|
||||
outfile = errfile = None
|
||||
if hasattr(self, 'out') and not self.out.tmpfile.closed:
|
||||
outfile = self.out.done()
|
||||
if hasattr(self, 'err') and not self.err.tmpfile.closed:
|
||||
errfile = self.err.done()
|
||||
if hasattr(self, 'in_'):
|
||||
self.in_.done()
|
||||
if save:
|
||||
self._save()
|
||||
return outfile, errfile
|
||||
|
||||
def readouterr(self):
|
||||
""" return snapshot value of stdout/stderr capturings. """
|
||||
out = self._readsnapshot('out')
|
||||
err = self._readsnapshot('err')
|
||||
return out, err
|
||||
|
||||
def _readsnapshot(self, name):
|
||||
if hasattr(self, name):
|
||||
f = getattr(self, name).tmpfile
|
||||
else:
|
||||
return ''
|
||||
|
||||
f.seek(0)
|
||||
res = f.read()
|
||||
enc = getattr(f, "encoding", None)
|
||||
if enc:
|
||||
res = py.builtin._totext(res, enc, "replace")
|
||||
f.truncate(0)
|
||||
f.seek(0)
|
||||
return res
|
||||
|
||||
|
||||
class StdCapture(Capture):
|
||||
""" This class allows to capture writes to sys.stdout|stderr "in-memory"
|
||||
and will raise errors on tries to read from sys.stdin. It only
|
||||
modifies sys.stdout|stderr|stdin attributes and does not
|
||||
touch underlying File Descriptors (use StdCaptureFD for that).
|
||||
"""
|
||||
def __init__(self, out=True, err=True, in_=True, mixed=False, now=True):
|
||||
self._oldout = sys.stdout
|
||||
self._olderr = sys.stderr
|
||||
self._oldin = sys.stdin
|
||||
if out and not hasattr(out, 'file'):
|
||||
out = TextIO()
|
||||
self.out = out
|
||||
if err:
|
||||
if mixed:
|
||||
err = out
|
||||
elif not hasattr(err, 'write'):
|
||||
err = TextIO()
|
||||
self.err = err
|
||||
self.in_ = in_
|
||||
if now:
|
||||
self.startall()
|
||||
|
||||
def startall(self):
|
||||
if self.out:
|
||||
sys.stdout = self.out
|
||||
if self.err:
|
||||
sys.stderr = self.err
|
||||
if self.in_:
|
||||
sys.stdin = self.in_ = DontReadFromInput()
|
||||
|
||||
def done(self, save=True):
|
||||
""" return (outfile, errfile) and stop capturing. """
|
||||
outfile = errfile = None
|
||||
if self.out and not self.out.closed:
|
||||
sys.stdout = self._oldout
|
||||
outfile = self.out
|
||||
outfile.seek(0)
|
||||
if self.err and not self.err.closed:
|
||||
sys.stderr = self._olderr
|
||||
errfile = self.err
|
||||
errfile.seek(0)
|
||||
if self.in_:
|
||||
sys.stdin = self._oldin
|
||||
return outfile, errfile
|
||||
|
||||
def resume(self):
|
||||
""" resume capturing with original temp files. """
|
||||
self.startall()
|
||||
|
||||
def readouterr(self):
|
||||
""" return snapshot value of stdout/stderr capturings. """
|
||||
out = err = ""
|
||||
if self.out:
|
||||
out = self.out.getvalue()
|
||||
self.out.truncate(0)
|
||||
self.out.seek(0)
|
||||
if self.err:
|
||||
err = self.err.getvalue()
|
||||
self.err.truncate(0)
|
||||
self.err.seek(0)
|
||||
return out, err
|
||||
|
||||
|
||||
class DontReadFromInput:
|
||||
"""Temporary stub class. Ideally when stdin is accessed, the
|
||||
capturing should be turned off, with possibly all data captured
|
||||
so far sent to the screen. This should be configurable, though,
|
||||
because in automated test runs it is better to crash than
|
||||
hang indefinitely.
|
||||
"""
|
||||
def read(self, *args):
|
||||
raise IOError("reading from stdin while output is captured")
|
||||
readline = read
|
||||
readlines = read
|
||||
__iter__ = read
|
||||
|
||||
def fileno(self):
|
||||
raise ValueError("redirected Stdin is pseudofile, has no fileno()")
|
||||
|
||||
def isatty(self):
|
||||
return False
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user