Merge branch 'master' of github.com:pytest-dev/pytest into fix-3854

This commit is contained in:
turturica
2018-08-22 20:36:52 -07:00
44 changed files with 595 additions and 157 deletions

View File

@@ -719,7 +719,9 @@ class FormattedExcinfo(object):
repr_chain = []
e = excinfo.value
descr = None
while e is not None:
seen = set()
while e is not None and id(e) not in seen:
seen.add(id(e))
if excinfo:
reprtraceback = self.repr_traceback(excinfo)
reprcrash = excinfo._getreprcrash()

View File

@@ -16,7 +16,6 @@ import six
import pytest
from _pytest.compat import CaptureIO
patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
@@ -63,8 +62,9 @@ def pytest_load_initial_conftests(early_config, parser, args):
# finally trigger conftest loading but while capturing (issue93)
capman.start_global_capturing()
outcome = yield
out, err = capman.suspend_global_capture()
capman.suspend_global_capture()
if outcome.excinfo is not None:
out, err = capman.read_global_capture()
sys.stdout.write(out)
sys.stderr.write(err)
@@ -85,6 +85,7 @@ class CaptureManager(object):
def __init__(self, method):
self._method = method
self._global_capturing = None
self._current_item = None
def _getcapture(self, method):
if method == "fd":
@@ -96,6 +97,8 @@ class CaptureManager(object):
else:
raise ValueError("unknown capturing method: %r" % method)
# Global capturing control
def start_global_capturing(self):
assert self._global_capturing is None
self._global_capturing = self._getcapture(self._method)
@@ -110,16 +113,15 @@ class CaptureManager(object):
def resume_global_capture(self):
self._global_capturing.resume_capturing()
def suspend_global_capture(self, item=None, in_=False):
if item is not None:
self.deactivate_fixture(item)
def suspend_global_capture(self, in_=False):
cap = getattr(self, "_global_capturing", None)
if cap is not None:
try:
outerr = cap.readouterr()
finally:
cap.suspend_capturing(in_=in_)
return outerr
cap.suspend_capturing(in_=in_)
def read_global_capture(self):
return self._global_capturing.readouterr()
# Fixture Control (its just forwarding, think about removing this later)
def activate_fixture(self, item):
"""If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
@@ -135,12 +137,53 @@ class CaptureManager(object):
if fixture is not None:
fixture.close()
def suspend_fixture(self, item):
fixture = getattr(item, "_capture_fixture", None)
if fixture is not None:
fixture._suspend()
def resume_fixture(self, item):
fixture = getattr(item, "_capture_fixture", None)
if fixture is not None:
fixture._resume()
# Helper context managers
@contextlib.contextmanager
def global_and_fixture_disabled(self):
"""Context manager to temporarily disables global and current fixture capturing."""
# Need to undo local capsys-et-al if exists before disabling global capture
self.suspend_fixture(self._current_item)
self.suspend_global_capture(in_=False)
try:
yield
finally:
self.resume_global_capture()
self.resume_fixture(self._current_item)
@contextlib.contextmanager
def item_capture(self, when, item):
self.resume_global_capture()
self.activate_fixture(item)
try:
yield
finally:
self.deactivate_fixture(item)
self.suspend_global_capture(in_=False)
out, err = self.read_global_capture()
item.add_report_section(when, "stdout", out)
item.add_report_section(when, "stderr", err)
# Hooks
@pytest.hookimpl(hookwrapper=True)
def pytest_make_collect_report(self, collector):
if isinstance(collector, pytest.File):
self.resume_global_capture()
outcome = yield
out, err = self.suspend_global_capture()
self.suspend_global_capture()
out, err = self.read_global_capture()
rep = outcome.get_result()
if out:
rep.sections.append(("Captured stdout", out))
@@ -150,29 +193,25 @@ class CaptureManager(object):
yield
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item):
self.resume_global_capture()
# no need to activate a capture fixture because they activate themselves during creation; this
# only makes sense when a fixture uses a capture fixture, otherwise the capture fixture will
# be activated during pytest_runtest_call
def pytest_runtest_protocol(self, item):
self._current_item = item
yield
self.suspend_capture_item(item, "setup")
self._current_item = None
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item):
with self.item_capture("setup", item):
yield
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(self, item):
self.resume_global_capture()
# it is important to activate this fixture during the call phase so it overwrites the "global"
# capture
self.activate_fixture(item)
yield
self.suspend_capture_item(item, "call")
with self.item_capture("call", item):
yield
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_teardown(self, item):
self.resume_global_capture()
self.activate_fixture(item)
yield
self.suspend_capture_item(item, "teardown")
with self.item_capture("teardown", item):
yield
@pytest.hookimpl(tryfirst=True)
def pytest_keyboard_interrupt(self, excinfo):
@@ -182,11 +221,6 @@ class CaptureManager(object):
def pytest_internalerror(self, excinfo):
self.stop_global_capturing()
def suspend_capture_item(self, item, when, in_=False):
out, err = self.suspend_global_capture(item, in_=in_)
item.add_report_section(when, "stdout", out)
item.add_report_section(when, "stderr", err)
capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"}
@@ -290,40 +324,54 @@ class CaptureFixture(object):
def __init__(self, captureclass, request):
self.captureclass = captureclass
self.request = request
self._capture = None
self._captured_out = self.captureclass.EMPTY_BUFFER
self._captured_err = self.captureclass.EMPTY_BUFFER
def _start(self):
self._capture = MultiCapture(
out=True, err=True, in_=False, Capture=self.captureclass
)
self._capture.start_capturing()
# Start if not started yet
if getattr(self, "_capture", None) is None:
self._capture = MultiCapture(
out=True, err=True, in_=False, Capture=self.captureclass
)
self._capture.start_capturing()
def close(self):
cap = self.__dict__.pop("_capture", None)
if cap is not None:
self._outerr = cap.pop_outerr_to_orig()
cap.stop_capturing()
if self._capture is not None:
out, err = self._capture.pop_outerr_to_orig()
self._captured_out += out
self._captured_err += err
self._capture.stop_capturing()
self._capture = None
def readouterr(self):
"""Read and return the captured output so far, resetting the internal buffer.
:return: captured content as a namedtuple with ``out`` and ``err`` string attributes
"""
try:
return self._capture.readouterr()
except AttributeError:
return self._outerr
captured_out, captured_err = self._captured_out, self._captured_err
if self._capture is not None:
out, err = self._capture.readouterr()
captured_out += out
captured_err += err
self._captured_out = self.captureclass.EMPTY_BUFFER
self._captured_err = self.captureclass.EMPTY_BUFFER
return CaptureResult(captured_out, captured_err)
def _suspend(self):
"""Suspends this fixture's own capturing temporarily."""
self._capture.suspend_capturing()
def _resume(self):
"""Resumes this fixture's own capturing temporarily."""
self._capture.resume_capturing()
@contextlib.contextmanager
def disabled(self):
"""Temporarily disables capture while inside the 'with' block."""
self._capture.suspend_capturing()
capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
capmanager.suspend_global_capture(item=None, in_=False)
try:
with capmanager.global_and_fixture_disabled():
yield
finally:
capmanager.resume_global_capture()
self._capture.resume_capturing()
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
@@ -440,6 +488,7 @@ class MultiCapture(object):
class NoCapture(object):
EMPTY_BUFFER = None
__init__ = start = done = suspend = resume = lambda *args: None
@@ -449,6 +498,8 @@ class FDCaptureBinary(object):
snap() produces `bytes`
"""
EMPTY_BUFFER = bytes()
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
try:
@@ -522,6 +573,8 @@ class FDCapture(FDCaptureBinary):
snap() produces text
"""
EMPTY_BUFFER = str()
def snap(self):
res = FDCaptureBinary.snap(self)
enc = getattr(self.tmpfile, "encoding", None)
@@ -531,6 +584,9 @@ class FDCapture(FDCaptureBinary):
class SysCapture(object):
EMPTY_BUFFER = str()
def __init__(self, fd, tmpfile=None):
name = patchsysdict[fd]
self._old = getattr(sys, name)
@@ -568,6 +624,8 @@ class SysCapture(object):
class SysCaptureBinary(SysCapture):
EMPTY_BUFFER = bytes()
def snap(self):
res = self.tmpfile.buffer.getvalue()
self.tmpfile.seek(0)

View File

@@ -8,6 +8,7 @@ import functools
import inspect
import re
import sys
from contextlib import contextmanager
import py
@@ -151,6 +152,13 @@ def getfuncargnames(function, is_method=False, cls=None):
return arg_names
@contextmanager
def dummy_context_manager():
"""Context manager that does nothing, useful in situations where you might need an actual context manager or not
depending on some condition. Using this allow to keep the same code"""
yield
def get_default_arg_names(function):
# Note: this code intentionally mirrors the code at the beginning of getfuncargnames,
# to get the arguments which were excluded from its result because they had default values

View File

@@ -174,23 +174,23 @@ class Argument(object):
if isinstance(typ, six.string_types):
if typ == "choice":
warnings.warn(
"type argument to addoption() is a string %r."
" For parsearg this is optional and when supplied"
" should be a type."
"`type` argument to addoption() is the string %r."
" For choices this is optional and can be omitted, "
" but when supplied should be a type (for example `str` or `int`)."
" (options: %s)" % (typ, names),
DeprecationWarning,
stacklevel=3,
stacklevel=4,
)
# argparse expects a type here take it from
# the type of the first element
attrs["type"] = type(attrs["choices"][0])
else:
warnings.warn(
"type argument to addoption() is a string %r."
" For parsearg this should be a type."
"`type` argument to addoption() is the string %r, "
" but when supplied should be a type (for example `str` or `int`)."
" (options: %s)" % (typ, names),
DeprecationWarning,
stacklevel=3,
stacklevel=4,
)
attrs["type"] = Argument._typ_map[typ]
# used in test_parseopt -> test_parse_defaultgetter

View File

@@ -102,7 +102,8 @@ class PdbInvoke(object):
def pytest_exception_interact(self, node, call, report):
capman = node.config.pluginmanager.getplugin("capturemanager")
if capman:
out, err = capman.suspend_global_capture(in_=True)
capman.suspend_global_capture(in_=True)
out, err = capman.read_global_capture()
sys.stdout.write(out)
sys.stdout.write(err)
_enter_pdb(node, call.excinfo, report)

View File

@@ -307,8 +307,8 @@ class FuncFixtureInfo(object):
# fixture names specified via usefixtures and via autouse=True in fixture
# definitions.
initialnames = attr.ib(type=tuple)
names_closure = attr.ib(type="List[str]")
name2fixturedefs = attr.ib(type="List[str, List[FixtureDef]]")
names_closure = attr.ib() # type: List[str]
name2fixturedefs = attr.ib() # type: List[str, List[FixtureDef]]
def prune_dependency_tree(self):
"""Recompute names_closure from initialnames and name2fixturedefs

View File

@@ -6,6 +6,7 @@ from contextlib import closing, contextmanager
import re
import six
from _pytest.compat import dummy_context_manager
from _pytest.config import create_terminal_writer
import pytest
import py
@@ -369,11 +370,6 @@ def pytest_configure(config):
config.pluginmanager.register(LoggingPlugin(config), "logging-plugin")
@contextmanager
def _dummy_context_manager():
yield
class LoggingPlugin(object):
"""Attaches to the logging module and captures log messages for each test.
"""
@@ -537,7 +533,7 @@ class LoggingPlugin(object):
log_cli_handler, formatter=log_cli_formatter, level=log_cli_level
)
else:
self.live_logs_context = _dummy_context_manager()
self.live_logs_context = dummy_context_manager()
class _LiveLoggingStreamHandler(logging.StreamHandler):
@@ -572,9 +568,12 @@ class _LiveLoggingStreamHandler(logging.StreamHandler):
self._test_outcome_written = False
def emit(self, record):
if self.capture_manager is not None:
self.capture_manager.suspend_global_capture()
try:
ctx_manager = (
self.capture_manager.global_and_fixture_disabled()
if self.capture_manager
else dummy_context_manager()
)
with ctx_manager:
if not self._first_record_emitted:
self.stream.write("\n")
self._first_record_emitted = True
@@ -586,6 +585,3 @@ class _LiveLoggingStreamHandler(logging.StreamHandler):
self.stream.section("live log " + self._when, sep="-", bold=True)
self._section_name_shown = True
logging.StreamHandler.emit(self, record)
finally:
if self.capture_manager is not None:
self.capture_manager.resume_global_capture()

View File

@@ -505,8 +505,9 @@ class Session(nodes.FSCollector):
root = self._node_cache[pkginit]
else:
col = root._collectfile(pkginit)
if col and isinstance(col, Package):
root = col[0]
if col:
if isinstance(col[0], Package):
root = col[0]
self._node_cache[root.fspath] = root
# If it's a directory argument, recurse and look for any Subpackages.
@@ -624,11 +625,12 @@ class Session(nodes.FSCollector):
resultnodes.append(node)
continue
assert isinstance(node, nodes.Collector)
if node.nodeid in self._node_cache:
rep = self._node_cache[node.nodeid]
key = (type(node), node.nodeid)
if key in self._node_cache:
rep = self._node_cache[key]
else:
rep = collect_one_node(node)
self._node_cache[node.nodeid] = rep
self._node_cache[key] = rep
if rep.passed:
has_matched = False
for x in rep.result:

View File

@@ -550,18 +550,22 @@ class Testdir(object):
return ret
def makefile(self, ext, *args, **kwargs):
"""Create a new file in the testdir.
r"""Create new file(s) in the testdir.
ext: The extension the file should use, including the dot, e.g. `.py`.
args: All args will be treated as strings and joined using newlines.
:param str ext: The extension the file(s) should use, including the dot, e.g. `.py`.
:param list[str] args: All args will be treated as strings and joined using newlines.
The result will be written as contents to the file. The name of the
file will be based on the test function requesting this fixture.
E.g. "testdir.makefile('.txt', 'line1', 'line2')"
kwargs: Each keyword is the name of a file, while the value of it will
:param kwargs: Each keyword is the name of a file, while the value of it will
be written as contents of the file.
E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
Examples:
.. code-block:: python
testdir.makefile(".txt", "line1", "line2")
testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n")
"""
return self._makefile(ext, args, kwargs)

View File

@@ -216,18 +216,6 @@ def pytest_pycollect_makemodule(path, parent):
return Module(path, parent)
def pytest_ignore_collect(path, config):
# Skip duplicate packages.
keepduplicates = config.getoption("keepduplicates")
if keepduplicates:
duplicate_paths = config.pluginmanager._duplicatepaths
if path.basename == "__init__.py":
if path in duplicate_paths:
return True
else:
duplicate_paths.add(path)
@hookimpl(hookwrapper=True)
def pytest_pycollect_makeitem(collector, name, obj):
outcome = yield
@@ -554,9 +542,7 @@ class Package(Module):
self.name = fspath.dirname
self.trace = session.trace
self._norecursepatterns = session._norecursepatterns
for path in list(session.config.pluginmanager._duplicatepaths):
if path.dirname == fspath.dirname and path != fspath:
session.config.pluginmanager._duplicatepaths.remove(path)
self.fspath = fspath
def _recurse(self, path):
ihook = self.gethookproxy(path.dirpath())
@@ -594,6 +580,15 @@ class Package(Module):
return path in self.session._initialpaths
def collect(self):
# XXX: HACK!
# Before starting to collect any files from this package we need
# to cleanup the duplicate paths added by the session's collect().
# Proper fix is to not track these as duplicates in the first place.
for path in list(self.session.config.pluginmanager._duplicatepaths):
# if path.parts()[:len(self.fspath.dirpath().parts())] == self.fspath.dirpath().parts():
if path.dirname.startswith(self.name):
self.session.config.pluginmanager._duplicatepaths.remove(path)
this_path = self.fspath.dirpath()
pkg_prefix = None
yield Module(this_path.join("__init__.py"), self)
@@ -884,12 +879,13 @@ class Metafunc(fixtures.FuncargnamesCompatAttr):
"""
def __init__(self, definition, fixtureinfo, config, cls=None, module=None):
#: access to the :class:`_pytest.config.Config` object for the test session
assert (
isinstance(definition, FunctionDefinition)
or type(definition).__name__ == "DefinitionMock"
)
self.definition = definition
#: access to the :class:`_pytest.config.Config` object for the test session
self.config = config
#: the module object where the test function is defined in.

View File

@@ -51,7 +51,8 @@ def _show_fixture_action(fixturedef, msg):
config = fixturedef._fixturemanager.config
capman = config.pluginmanager.getplugin("capturemanager")
if capman:
out, err = capman.suspend_global_capture()
capman.suspend_global_capture()
out, err = capman.read_global_capture()
tw = config.get_terminal_writer()
tw.line()

View File

@@ -706,7 +706,12 @@ class TerminalReporter(object):
self._outrep_summary(rep)
def print_teardown_sections(self, rep):
showcapture = self.config.option.showcapture
if showcapture == "no":
return
for secname, content in rep.sections:
if showcapture != "all" and showcapture not in secname:
continue
if "teardown" in secname:
self._tw.sep("-", secname)
if content[-1:] == "\n":

View File

@@ -49,6 +49,14 @@ def pytest_addoption(parser):
)
def pytest_configure(config):
config.addinivalue_line(
"markers",
"filterwarnings(warning): add a warning filter to the given test. "
"see http://pytest.org/latest/warnings.html#pytest-mark-filterwarnings ",
)
@contextmanager
def catch_warnings_for_item(item):
"""