diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 323881151..64f4b8b92 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -9,22 +9,19 @@ import os import sys from io import UnsupportedOperation from tempfile import TemporaryFile -from typing import Generator from typing import Optional from typing import TextIO +from typing import Tuple import pytest from _pytest.compat import TYPE_CHECKING from _pytest.config import Config -from _pytest.fixtures import FixtureRequest if TYPE_CHECKING: from typing_extensions import Literal _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] -patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} - def pytest_addoption(parser): group = parser.getgroup("general") @@ -45,703 +42,6 @@ def pytest_addoption(parser): ) -@pytest.hookimpl(hookwrapper=True) -def pytest_load_initial_conftests(early_config: Config): - ns = early_config.known_args_namespace - if ns.capture == "fd": - _py36_windowsconsoleio_workaround(sys.stdout) - _colorama_workaround() - _readline_workaround() - pluginmanager = early_config.pluginmanager - capman = CaptureManager(ns.capture) - pluginmanager.register(capman, "capturemanager") - - # make sure that capturemanager is properly reset at final shutdown - early_config.add_cleanup(capman.stop_global_capturing) - - # finally trigger conftest loading but while capturing (issue93) - capman.start_global_capturing() - outcome = yield - capman.suspend_global_capture() - if outcome.excinfo is not None: - out, err = capman.read_global_capture() - sys.stdout.write(out) - sys.stderr.write(err) - - -def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture": - if method == "fd": - return MultiCapture(out=True, err=True, Capture=FDCapture) - elif method == "sys": - return MultiCapture(out=True, err=True, Capture=SysCapture) - elif method == "no": - return MultiCapture(out=False, err=False, in_=False) - elif method == "tee-sys": - return MultiCapture(out=True, err=True, in_=False, Capture=TeeSysCapture) - raise ValueError("unknown capturing method: {!r}".format(method)) - - -class CaptureManager: - """ - Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each - test phase (setup, call, teardown). After each of those points, the captured output is obtained and - attached to the collection/runtest report. - - There are two levels of capture: - * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled - during collection and each test phase. - * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this - case special handling is needed to ensure the fixtures take precedence over the global capture. - """ - - def __init__(self, method: "_CaptureMethod") -> None: - self._method = method - self._global_capturing = None - self._capture_fixture = None # type: Optional[CaptureFixture] - - def __repr__(self): - return "".format( - self._method, self._global_capturing, self._capture_fixture - ) - - def is_capturing(self): - if self.is_globally_capturing(): - return "global" - if self._capture_fixture: - return "fixture %s" % self._capture_fixture.request.fixturename - return False - - # Global capturing control - - def is_globally_capturing(self): - return self._method != "no" - - def start_global_capturing(self): - assert self._global_capturing is None - self._global_capturing = _get_multicapture(self._method) - self._global_capturing.start_capturing() - - def stop_global_capturing(self): - if self._global_capturing is not None: - self._global_capturing.pop_outerr_to_orig() - self._global_capturing.stop_capturing() - self._global_capturing = None - - def resume_global_capture(self): - # During teardown of the python process, and on rare occasions, capture - # attributes can be `None` while trying to resume global capture. - if self._global_capturing is not None: - self._global_capturing.resume_capturing() - - def suspend_global_capture(self, in_=False): - cap = getattr(self, "_global_capturing", None) - if cap is not None: - cap.suspend_capturing(in_=in_) - - def suspend(self, in_=False): - # Need to undo local capsys-et-al if it exists before disabling global capture. - self.suspend_fixture() - self.suspend_global_capture(in_) - - def resume(self): - self.resume_global_capture() - self.resume_fixture() - - def read_global_capture(self): - return self._global_capturing.readouterr() - - # Fixture Control (it's just forwarding, think about removing this later) - - @contextlib.contextmanager - def _capturing_for_request( - self, request: FixtureRequest - ) -> Generator["CaptureFixture", None, None]: - """ - Context manager that creates a ``CaptureFixture`` instance for the - given ``request``, ensuring there is only a single one being requested - at the same time. - - This is used as a helper with ``capsys``, ``capfd`` etc. - """ - if self._capture_fixture: - other_name = next( - k - for k, v in map_fixname_class.items() - if v is self._capture_fixture.captureclass - ) - raise request.raiseerror( - "cannot use {} and {} at the same time".format( - request.fixturename, other_name - ) - ) - capture_class = map_fixname_class[request.fixturename] - self._capture_fixture = CaptureFixture(capture_class, request) - self.activate_fixture() - yield self._capture_fixture - self._capture_fixture.close() - self._capture_fixture = None - - def activate_fixture(self): - """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over - the global capture. - """ - if self._capture_fixture: - self._capture_fixture._start() - - def deactivate_fixture(self): - """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" - if self._capture_fixture: - self._capture_fixture.close() - - def suspend_fixture(self): - if self._capture_fixture: - self._capture_fixture._suspend() - - def resume_fixture(self): - if self._capture_fixture: - self._capture_fixture._resume() - - # Helper context managers - - @contextlib.contextmanager - def global_and_fixture_disabled(self): - """Context manager to temporarily disable global and current fixture capturing.""" - self.suspend() - try: - yield - finally: - self.resume() - - @contextlib.contextmanager - def item_capture(self, when, item): - self.resume_global_capture() - self.activate_fixture() - try: - yield - finally: - self.deactivate_fixture() - self.suspend_global_capture(in_=False) - - out, err = self.read_global_capture() - item.add_report_section(when, "stdout", out) - item.add_report_section(when, "stderr", err) - - # Hooks - - @pytest.hookimpl(hookwrapper=True) - def pytest_make_collect_report(self, collector): - if isinstance(collector, pytest.File): - self.resume_global_capture() - outcome = yield - self.suspend_global_capture() - out, err = self.read_global_capture() - rep = outcome.get_result() - if out: - rep.sections.append(("Captured stdout", out)) - if err: - rep.sections.append(("Captured stderr", err)) - else: - yield - - @pytest.hookimpl(hookwrapper=True) - def pytest_runtest_setup(self, item): - with self.item_capture("setup", item): - yield - - @pytest.hookimpl(hookwrapper=True) - def pytest_runtest_call(self, item): - with self.item_capture("call", item): - yield - - @pytest.hookimpl(hookwrapper=True) - def pytest_runtest_teardown(self, item): - with self.item_capture("teardown", item): - yield - - @pytest.hookimpl(tryfirst=True) - def pytest_keyboard_interrupt(self, excinfo): - self.stop_global_capturing() - - @pytest.hookimpl(tryfirst=True) - def pytest_internalerror(self, excinfo): - self.stop_global_capturing() - - -@pytest.fixture -def capsys(request): - """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. - - The captured output is made available via ``capsys.readouterr()`` method - calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``text`` objects. - """ - capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture - - -@pytest.fixture -def capsysbinary(request): - """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. - - The captured output is made available via ``capsysbinary.readouterr()`` - method calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``bytes`` objects. - """ - capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture - - -@pytest.fixture -def capfd(request): - """Enable text capturing of writes to file descriptors ``1`` and ``2``. - - The captured output is made available via ``capfd.readouterr()`` method - calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``text`` objects. - """ - capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture - - -@pytest.fixture -def capfdbinary(request): - """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. - - The captured output is made available via ``capfd.readouterr()`` method - calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``byte`` objects. - """ - capman = request.config.pluginmanager.getplugin("capturemanager") - with capman._capturing_for_request(request) as fixture: - yield fixture - - -class CaptureIO(io.TextIOWrapper): - def __init__(self) -> None: - super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) - - def getvalue(self) -> str: - assert isinstance(self.buffer, io.BytesIO) - return self.buffer.getvalue().decode("UTF-8") - - -class TeeCaptureIO(CaptureIO): - def __init__(self, other: TextIO) -> None: - self._other = other - super().__init__() - - def write(self, s: str) -> int: - super().write(s) - return self._other.write(s) - - -class CaptureFixture: - """ - Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` - fixtures. - """ - - def __init__(self, captureclass, request): - self.captureclass = captureclass - self.request = request - self._capture = None - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - - def _start(self): - if self._capture is None: - self._capture = MultiCapture( - out=True, err=True, in_=False, Capture=self.captureclass - ) - self._capture.start_capturing() - - def close(self): - if self._capture is not None: - out, err = self._capture.pop_outerr_to_orig() - self._captured_out += out - self._captured_err += err - self._capture.stop_capturing() - self._capture = None - - def readouterr(self): - """Read and return the captured output so far, resetting the internal buffer. - - :return: captured content as a namedtuple with ``out`` and ``err`` string attributes - """ - captured_out, captured_err = self._captured_out, self._captured_err - if self._capture is not None: - out, err = self._capture.readouterr() - captured_out += out - captured_err += err - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - return CaptureResult(captured_out, captured_err) - - def _suspend(self): - """Suspends this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.suspend_capturing() - - def _resume(self): - """Resumes this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.resume_capturing() - - @contextlib.contextmanager - def disabled(self): - """Temporarily disables capture while inside the 'with' block.""" - capmanager = self.request.config.pluginmanager.getplugin("capturemanager") - with capmanager.global_and_fixture_disabled(): - yield - - -class EncodedFile(io.TextIOWrapper): - __slots__ = () - - @property - def name(self) -> str: - # Ensure that file.name is a string. Workaround for a Python bug - # fixed in >=3.7.4: https://bugs.python.org/issue36015 - return repr(self.buffer) - - @property - def mode(self) -> str: - # TextIOWrapper doesn't expose a mode, but at least some of our - # tests check it. - return self.buffer.mode.replace("b", "") - - -CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) - - -class MultiCapture: - out = err = in_ = None - _state = None - _in_suspended = False - - def __init__(self, out=True, err=True, in_=True, Capture=None): - if in_: - self.in_ = Capture(0) - if out: - self.out = Capture(1) - if err: - self.err = Capture(2) - - def __repr__(self): - return "".format( - self.out, self.err, self.in_, self._state, self._in_suspended, - ) - - def start_capturing(self): - self._state = "started" - if self.in_: - self.in_.start() - if self.out: - self.out.start() - if self.err: - self.err.start() - - def pop_outerr_to_orig(self): - """ pop current snapshot out/err capture and flush to orig streams. """ - out, err = self.readouterr() - if out: - self.out.writeorg(out) - if err: - self.err.writeorg(err) - return out, err - - def suspend_capturing(self, in_=False): - self._state = "suspended" - if self.out: - self.out.suspend() - if self.err: - self.err.suspend() - if in_ and self.in_: - self.in_.suspend() - self._in_suspended = True - - def resume_capturing(self): - self._state = "resumed" - if self.out: - self.out.resume() - if self.err: - self.err.resume() - if self._in_suspended: - self.in_.resume() - self._in_suspended = False - - def stop_capturing(self): - """ stop capturing and reset capturing streams """ - if self._state == "stopped": - raise ValueError("was already stopped") - self._state = "stopped" - if self.out: - self.out.done() - if self.err: - self.err.done() - if self.in_: - self.in_.done() - - def readouterr(self) -> CaptureResult: - if self.out: - out = self.out.snap() - else: - out = "" - if self.err: - err = self.err.snap() - else: - err = "" - return CaptureResult(out, err) - - -class NoCapture: - EMPTY_BUFFER = None - __init__ = start = done = suspend = resume = lambda *args: None - - -class FDCaptureBinary: - """Capture IO to/from a given os-level filedescriptor. - - snap() produces `bytes` - """ - - EMPTY_BUFFER = b"" - _state = None - - def __init__(self, targetfd, tmpfile=None): - self.targetfd = targetfd - - try: - os.fstat(targetfd) - except OSError: - # FD capturing is conceptually simple -- create a temporary file, - # redirect the FD to it, redirect back when done. But when the - # target FD is invalid it throws a wrench into this loveley scheme. - # - # Tests themselves shouldn't care if the FD is valid, FD capturing - # should work regardless of external circumstances. So falling back - # to just sys capturing is not a good option. - # - # Further complications are the need to support suspend() and the - # possibility of FD reuse (e.g. the tmpfile getting the very same - # target FD). The following approach is robust, I believe. - self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) - os.dup2(self.targetfd_invalid, targetfd) - else: - self.targetfd_invalid = None - self.targetfd_save = os.dup(targetfd) - - if targetfd == 0: - assert not tmpfile, "cannot set tmpfile with stdin" - tmpfile = open(os.devnull) - self.syscapture = SysCapture(targetfd) - else: - if tmpfile is None: - tmpfile = EncodedFile( - TemporaryFile(buffering=0), - encoding="utf-8", - errors="replace", - write_through=True, - ) - if targetfd in patchsysdict: - self.syscapture = SysCapture(targetfd, tmpfile) - else: - self.syscapture = NoCapture() - self.tmpfile = tmpfile - - def __repr__(self): - return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.targetfd, - self.targetfd_save, - self._state, - self.tmpfile, - ) - - def start(self): - """ Start capturing on targetfd using memorized tmpfile. """ - os.dup2(self.tmpfile.fileno(), self.targetfd) - self.syscapture.start() - self._state = "started" - - def snap(self): - self.tmpfile.seek(0) - res = self.tmpfile.buffer.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - """ stop capturing, restore streams, return original capture file, - seeked to position zero. """ - os.dup2(self.targetfd_save, self.targetfd) - os.close(self.targetfd_save) - if self.targetfd_invalid is not None: - if self.targetfd_invalid != self.targetfd: - os.close(self.targetfd) - os.close(self.targetfd_invalid) - self.syscapture.done() - self.tmpfile.close() - self._state = "done" - - def suspend(self): - self.syscapture.suspend() - os.dup2(self.targetfd_save, self.targetfd) - self._state = "suspended" - - def resume(self): - self.syscapture.resume() - os.dup2(self.tmpfile.fileno(), self.targetfd) - self._state = "resumed" - - def writeorg(self, data): - """ write to original file descriptor. """ - os.write(self.targetfd_save, data) - - -class FDCapture(FDCaptureBinary): - """Capture IO to/from a given os-level filedescriptor. - - snap() produces text - """ - - # Ignore type because it doesn't match the type in the superclass (bytes). - EMPTY_BUFFER = "" # type: ignore - - def snap(self): - self.tmpfile.seek(0) - res = self.tmpfile.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - """ write to original file descriptor. """ - super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream - - -class SysCaptureBinary: - - EMPTY_BUFFER = b"" - _state = None - - def __init__(self, fd, tmpfile=None): - name = patchsysdict[fd] - self._old = getattr(sys, name) - self.name = name - if tmpfile is None: - if name == "stdin": - tmpfile = DontReadFromInput() - else: - tmpfile = CaptureIO() - self.tmpfile = tmpfile - - def __repr__(self): - return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.name, - hasattr(self, "_old") and repr(self._old) or "", - self._state, - self.tmpfile, - ) - - def start(self): - setattr(sys, self.name, self.tmpfile) - self._state = "started" - - def snap(self): - res = self.tmpfile.buffer.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - setattr(sys, self.name, self._old) - del self._old - self.tmpfile.close() - self._state = "done" - - def suspend(self): - setattr(sys, self.name, self._old) - self._state = "suspended" - - def resume(self): - setattr(sys, self.name, self.tmpfile) - self._state = "resumed" - - def writeorg(self, data): - self._old.flush() - self._old.buffer.write(data) - self._old.buffer.flush() - - -class SysCapture(SysCaptureBinary): - EMPTY_BUFFER = "" # type: ignore[assignment] - - def snap(self): - res = self.tmpfile.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - self._old.write(data) - self._old.flush() - - -class TeeSysCapture(SysCapture): - def __init__(self, fd, tmpfile=None): - name = patchsysdict[fd] - self._old = getattr(sys, name) - self.name = name - if tmpfile is None: - if name == "stdin": - tmpfile = DontReadFromInput() - else: - tmpfile = TeeCaptureIO(self._old) - self.tmpfile = tmpfile - - -map_fixname_class = { - "capfd": FDCapture, - "capfdbinary": FDCaptureBinary, - "capsys": SysCapture, - "capsysbinary": SysCaptureBinary, -} - - -class DontReadFromInput: - encoding = None - - def read(self, *args): - raise OSError( - "pytest: reading from stdin while output is captured! Consider using `-s`." - ) - - readline = read - readlines = read - __next__ = read - - def __iter__(self): - return self - - def fileno(self): - raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") - - def isatty(self): - return False - - def close(self): - pass - - @property - def buffer(self): - return self - - def _colorama_workaround(): """ Ensure colorama is imported so that it attaches to the correct stdio @@ -839,3 +139,740 @@ def _py36_windowsconsoleio_workaround(stream): sys.stdin = _reopen_stdio(sys.stdin, "rb") sys.stdout = _reopen_stdio(sys.stdout, "wb") sys.stderr = _reopen_stdio(sys.stderr, "wb") + + +@pytest.hookimpl(hookwrapper=True) +def pytest_load_initial_conftests(early_config: Config): + ns = early_config.known_args_namespace + if ns.capture == "fd": + _py36_windowsconsoleio_workaround(sys.stdout) + _colorama_workaround() + _readline_workaround() + pluginmanager = early_config.pluginmanager + capman = CaptureManager(ns.capture) + pluginmanager.register(capman, "capturemanager") + + # make sure that capturemanager is properly reset at final shutdown + early_config.add_cleanup(capman.stop_global_capturing) + + # finally trigger conftest loading but while capturing (issue93) + capman.start_global_capturing() + outcome = yield + capman.suspend_global_capture() + if outcome.excinfo is not None: + out, err = capman.read_global_capture() + sys.stdout.write(out) + sys.stderr.write(err) + + +# IO Helpers. + + +class EncodedFile(io.TextIOWrapper): + __slots__ = () + + @property + def name(self) -> str: + # Ensure that file.name is a string. Workaround for a Python bug + # fixed in >=3.7.4: https://bugs.python.org/issue36015 + return repr(self.buffer) + + @property + def mode(self) -> str: + # TextIOWrapper doesn't expose a mode, but at least some of our + # tests check it. + return self.buffer.mode.replace("b", "") + + +class CaptureIO(io.TextIOWrapper): + def __init__(self) -> None: + super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) + + def getvalue(self) -> str: + assert isinstance(self.buffer, io.BytesIO) + return self.buffer.getvalue().decode("UTF-8") + + +class TeeCaptureIO(CaptureIO): + def __init__(self, other: TextIO) -> None: + self._other = other + super().__init__() + + def write(self, s) -> int: + super().write(s) + return self._other.write(s) + + +class DontReadFromInput: + encoding = None + + def read(self, *args): + raise OSError( + "pytest: reading from stdin while output is captured! Consider using `-s`." + ) + + readline = read + readlines = read + __next__ = read + + def __iter__(self): + return self + + def fileno(self): + raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") + + def isatty(self): + return False + + def close(self): + pass + + @property + def buffer(self): + return self + + +# Capture classes. + + +patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} + + +class NoCapture: + EMPTY_BUFFER = None + __init__ = start = done = suspend = resume = lambda *args: None + + +class SysCaptureBinary: + + EMPTY_BUFFER = b"" + + def __init__(self, fd, tmpfile=None, *, tee=False): + name = patchsysdict[fd] + self._old = getattr(sys, name) + self.name = name + if tmpfile is None: + if name == "stdin": + tmpfile = DontReadFromInput() + else: + tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) + self.tmpfile = tmpfile + self._state = "initialized" + + def repr(self, class_name: str) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + class_name, + self.name, + hasattr(self, "_old") and repr(self._old) or "", + self._state, + self.tmpfile, + ) + + def __repr__(self) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.name, + hasattr(self, "_old") and repr(self._old) or "", + self._state, + self.tmpfile, + ) + + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: + assert ( + self._state in states + ), "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + + def start(self): + self._assert_state("start", ("initialized",)) + setattr(sys, self.name, self.tmpfile) + self._state = "started" + + def snap(self): + self._assert_state("snap", ("started", "suspended")) + self.tmpfile.seek(0) + res = self.tmpfile.buffer.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": + return + setattr(sys, self.name, self._old) + del self._old + self.tmpfile.close() + self._state = "done" + + def suspend(self): + self._assert_state("suspend", ("started", "suspended")) + setattr(sys, self.name, self._old) + self._state = "suspended" + + def resume(self): + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": + return + setattr(sys, self.name, self.tmpfile) + self._state = "started" + + def writeorg(self, data): + self._assert_state("writeorg", ("started", "suspended")) + self._old.flush() + self._old.buffer.write(data) + self._old.buffer.flush() + + +class SysCapture(SysCaptureBinary): + EMPTY_BUFFER = "" # type: ignore[assignment] + + def snap(self): + res = self.tmpfile.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def writeorg(self, data): + self._assert_state("writeorg", ("started", "suspended")) + self._old.write(data) + self._old.flush() + + +class FDCaptureBinary: + """Capture IO to/from a given os-level filedescriptor. + + snap() produces `bytes` + """ + + EMPTY_BUFFER = b"" + + def __init__(self, targetfd): + self.targetfd = targetfd + + try: + os.fstat(targetfd) + except OSError: + # FD capturing is conceptually simple -- create a temporary file, + # redirect the FD to it, redirect back when done. But when the + # target FD is invalid it throws a wrench into this loveley scheme. + # + # Tests themselves shouldn't care if the FD is valid, FD capturing + # should work regardless of external circumstances. So falling back + # to just sys capturing is not a good option. + # + # Further complications are the need to support suspend() and the + # possibility of FD reuse (e.g. the tmpfile getting the very same + # target FD). The following approach is robust, I believe. + self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) + os.dup2(self.targetfd_invalid, targetfd) + else: + self.targetfd_invalid = None + self.targetfd_save = os.dup(targetfd) + + if targetfd == 0: + self.tmpfile = open(os.devnull) + self.syscapture = SysCapture(targetfd) + else: + self.tmpfile = EncodedFile( + TemporaryFile(buffering=0), + encoding="utf-8", + errors="replace", + write_through=True, + ) + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, self.tmpfile) + else: + self.syscapture = NoCapture() + + self._state = "initialized" + + def __repr__(self): + return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.targetfd, + self.targetfd_save, + self._state, + self.tmpfile, + ) + + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: + assert ( + self._state in states + ), "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + + def start(self): + """ Start capturing on targetfd using memorized tmpfile. """ + self._assert_state("start", ("initialized",)) + os.dup2(self.tmpfile.fileno(), self.targetfd) + self.syscapture.start() + self._state = "started" + + def snap(self): + self._assert_state("snap", ("started", "suspended")) + self.tmpfile.seek(0) + res = self.tmpfile.buffer.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + """ stop capturing, restore streams, return original capture file, + seeked to position zero. """ + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": + return + os.dup2(self.targetfd_save, self.targetfd) + os.close(self.targetfd_save) + if self.targetfd_invalid is not None: + if self.targetfd_invalid != self.targetfd: + os.close(self.targetfd) + os.close(self.targetfd_invalid) + self.syscapture.done() + self.tmpfile.close() + self._state = "done" + + def suspend(self): + self._assert_state("suspend", ("started", "suspended")) + if self._state == "suspended": + return + self.syscapture.suspend() + os.dup2(self.targetfd_save, self.targetfd) + self._state = "suspended" + + def resume(self): + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": + return + self.syscapture.resume() + os.dup2(self.tmpfile.fileno(), self.targetfd) + self._state = "started" + + def writeorg(self, data): + """ write to original file descriptor. """ + self._assert_state("writeorg", ("started", "suspended")) + os.write(self.targetfd_save, data) + + +class FDCapture(FDCaptureBinary): + """Capture IO to/from a given os-level filedescriptor. + + snap() produces text + """ + + # Ignore type because it doesn't match the type in the superclass (bytes). + EMPTY_BUFFER = "" # type: ignore + + def snap(self): + self._assert_state("snap", ("started", "suspended")) + self.tmpfile.seek(0) + res = self.tmpfile.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def writeorg(self, data): + """ write to original file descriptor. """ + super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream + + +# MultiCapture + +CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) + + +class MultiCapture: + _state = None + _in_suspended = False + + def __init__(self, in_, out, err) -> None: + self.in_ = in_ + self.out = out + self.err = err + + def __repr__(self): + return "".format( + self.out, self.err, self.in_, self._state, self._in_suspended, + ) + + def start_capturing(self): + self._state = "started" + if self.in_: + self.in_.start() + if self.out: + self.out.start() + if self.err: + self.err.start() + + def pop_outerr_to_orig(self): + """ pop current snapshot out/err capture and flush to orig streams. """ + out, err = self.readouterr() + if out: + self.out.writeorg(out) + if err: + self.err.writeorg(err) + return out, err + + def suspend_capturing(self, in_=False): + self._state = "suspended" + if self.out: + self.out.suspend() + if self.err: + self.err.suspend() + if in_ and self.in_: + self.in_.suspend() + self._in_suspended = True + + def resume_capturing(self): + self._state = "resumed" + if self.out: + self.out.resume() + if self.err: + self.err.resume() + if self._in_suspended: + self.in_.resume() + self._in_suspended = False + + def stop_capturing(self): + """ stop capturing and reset capturing streams """ + if self._state == "stopped": + raise ValueError("was already stopped") + self._state = "stopped" + if self.out: + self.out.done() + if self.err: + self.err.done() + if self.in_: + self.in_.done() + + def readouterr(self) -> CaptureResult: + if self.out: + out = self.out.snap() + else: + out = "" + if self.err: + err = self.err.snap() + else: + err = "" + return CaptureResult(out, err) + + +def _get_multicapture(method: "_CaptureMethod") -> MultiCapture: + if method == "fd": + return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) + elif method == "sys": + return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2)) + elif method == "no": + return MultiCapture(in_=None, out=None, err=None) + elif method == "tee-sys": + return MultiCapture( + in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) + ) + raise ValueError("unknown capturing method: {!r}".format(method)) + + +# CaptureManager and CaptureFixture + + +class CaptureManager: + """ + Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each + test phase (setup, call, teardown). After each of those points, the captured output is obtained and + attached to the collection/runtest report. + + There are two levels of capture: + * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled + during collection and each test phase. + * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this + case special handling is needed to ensure the fixtures take precedence over the global capture. + """ + + def __init__(self, method: "_CaptureMethod") -> None: + self._method = method + self._global_capturing = None + self._capture_fixture = None # type: Optional[CaptureFixture] + + def __repr__(self): + return "".format( + self._method, self._global_capturing, self._capture_fixture + ) + + def is_capturing(self): + if self.is_globally_capturing(): + return "global" + if self._capture_fixture: + return "fixture %s" % self._capture_fixture.request.fixturename + return False + + # Global capturing control + + def is_globally_capturing(self): + return self._method != "no" + + def start_global_capturing(self): + assert self._global_capturing is None + self._global_capturing = _get_multicapture(self._method) + self._global_capturing.start_capturing() + + def stop_global_capturing(self): + if self._global_capturing is not None: + self._global_capturing.pop_outerr_to_orig() + self._global_capturing.stop_capturing() + self._global_capturing = None + + def resume_global_capture(self): + # During teardown of the python process, and on rare occasions, capture + # attributes can be `None` while trying to resume global capture. + if self._global_capturing is not None: + self._global_capturing.resume_capturing() + + def suspend_global_capture(self, in_=False): + if self._global_capturing is not None: + self._global_capturing.suspend_capturing(in_=in_) + + def suspend(self, in_=False): + # Need to undo local capsys-et-al if it exists before disabling global capture. + self.suspend_fixture() + self.suspend_global_capture(in_) + + def resume(self): + self.resume_global_capture() + self.resume_fixture() + + def read_global_capture(self): + return self._global_capturing.readouterr() + + # Fixture Control + + def set_fixture(self, capture_fixture: "CaptureFixture") -> None: + if self._capture_fixture: + current_fixture = self._capture_fixture.request.fixturename + requested_fixture = capture_fixture.request.fixturename + capture_fixture.request.raiseerror( + "cannot use {} and {} at the same time".format( + requested_fixture, current_fixture + ) + ) + self._capture_fixture = capture_fixture + + def unset_fixture(self) -> None: + self._capture_fixture = None + + def activate_fixture(self): + """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over + the global capture. + """ + if self._capture_fixture: + self._capture_fixture._start() + + def deactivate_fixture(self): + """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" + if self._capture_fixture: + self._capture_fixture.close() + + def suspend_fixture(self): + if self._capture_fixture: + self._capture_fixture._suspend() + + def resume_fixture(self): + if self._capture_fixture: + self._capture_fixture._resume() + + # Helper context managers + + @contextlib.contextmanager + def global_and_fixture_disabled(self): + """Context manager to temporarily disable global and current fixture capturing.""" + self.suspend() + try: + yield + finally: + self.resume() + + @contextlib.contextmanager + def item_capture(self, when, item): + self.resume_global_capture() + self.activate_fixture() + try: + yield + finally: + self.deactivate_fixture() + self.suspend_global_capture(in_=False) + + out, err = self.read_global_capture() + item.add_report_section(when, "stdout", out) + item.add_report_section(when, "stderr", err) + + # Hooks + + @pytest.hookimpl(hookwrapper=True) + def pytest_make_collect_report(self, collector): + if isinstance(collector, pytest.File): + self.resume_global_capture() + outcome = yield + self.suspend_global_capture() + out, err = self.read_global_capture() + rep = outcome.get_result() + if out: + rep.sections.append(("Captured stdout", out)) + if err: + rep.sections.append(("Captured stderr", err)) + else: + yield + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_setup(self, item): + with self.item_capture("setup", item): + yield + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_call(self, item): + with self.item_capture("call", item): + yield + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_teardown(self, item): + with self.item_capture("teardown", item): + yield + + @pytest.hookimpl(tryfirst=True) + def pytest_keyboard_interrupt(self, excinfo): + self.stop_global_capturing() + + @pytest.hookimpl(tryfirst=True) + def pytest_internalerror(self, excinfo): + self.stop_global_capturing() + + +class CaptureFixture: + """ + Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` + fixtures. + """ + + def __init__(self, captureclass, request): + self.captureclass = captureclass + self.request = request + self._capture = None + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + + def _start(self): + if self._capture is None: + self._capture = MultiCapture( + in_=None, out=self.captureclass(1), err=self.captureclass(2), + ) + self._capture.start_capturing() + + def close(self): + if self._capture is not None: + out, err = self._capture.pop_outerr_to_orig() + self._captured_out += out + self._captured_err += err + self._capture.stop_capturing() + self._capture = None + + def readouterr(self): + """Read and return the captured output so far, resetting the internal buffer. + + :return: captured content as a namedtuple with ``out`` and ``err`` string attributes + """ + captured_out, captured_err = self._captured_out, self._captured_err + if self._capture is not None: + out, err = self._capture.readouterr() + captured_out += out + captured_err += err + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + return CaptureResult(captured_out, captured_err) + + def _suspend(self): + """Suspends this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.suspend_capturing() + + def _resume(self): + """Resumes this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.resume_capturing() + + @contextlib.contextmanager + def disabled(self): + """Temporarily disables capture while inside the 'with' block.""" + capmanager = self.request.config.pluginmanager.getplugin("capturemanager") + with capmanager.global_and_fixture_disabled(): + yield + + +# The fixtures. + + +@pytest.fixture +def capsys(request): + """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. + + The captured output is made available via ``capsys.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``text`` objects. + """ + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(SysCapture, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@pytest.fixture +def capsysbinary(request): + """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. + + The captured output is made available via ``capsysbinary.readouterr()`` + method calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``bytes`` objects. + """ + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(SysCaptureBinary, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@pytest.fixture +def capfd(request): + """Enable text capturing of writes to file descriptors ``1`` and ``2``. + + The captured output is made available via ``capfd.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``text`` objects. + """ + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(FDCapture, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@pytest.fixture +def capfdbinary(request): + """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. + + The captured output is made available via ``capfd.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``byte`` objects. + """ + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(FDCaptureBinary, request) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() diff --git a/src/_pytest/pytester.py b/src/_pytest/pytester.py index 1da478736..9df86a22f 100644 --- a/src/_pytest/pytester.py +++ b/src/_pytest/pytester.py @@ -25,8 +25,7 @@ import py import pytest from _pytest._code import Source -from _pytest.capture import MultiCapture -from _pytest.capture import SysCapture +from _pytest.capture import _get_multicapture from _pytest.compat import TYPE_CHECKING from _pytest.config import _PluggyPlugin from _pytest.config import Config @@ -972,7 +971,7 @@ class Testdir: if syspathinsert: self.syspathinsert() now = time.time() - capture = MultiCapture(Capture=SysCapture) + capture = _get_multicapture("sys") capture.start_capturing() try: try: diff --git a/testing/test_capture.py b/testing/test_capture.py index 177d72ebc..1301a0e69 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -19,16 +19,28 @@ from _pytest.config import ExitCode # pylib 1.4.20.dev2 (rev 13d9af95547e) -def StdCaptureFD(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.FDCapture) +def StdCaptureFD(out: bool = True, err: bool = True, in_: bool = True) -> MultiCapture: + return capture.MultiCapture( + in_=capture.FDCapture(0) if in_ else None, + out=capture.FDCapture(1) if out else None, + err=capture.FDCapture(2) if err else None, + ) -def StdCapture(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.SysCapture) +def StdCapture(out: bool = True, err: bool = True, in_: bool = True) -> MultiCapture: + return capture.MultiCapture( + in_=capture.SysCapture(0) if in_ else None, + out=capture.SysCapture(1) if out else None, + err=capture.SysCapture(2) if err else None, + ) -def TeeStdCapture(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.TeeSysCapture) +def TeeStdCapture(out: bool = True, err: bool = True, in_: bool = True) -> MultiCapture: + return capture.MultiCapture( + in_=capture.SysCapture(0, tee=True) if in_ else None, + out=capture.SysCapture(1, tee=True) if out else None, + err=capture.SysCapture(2, tee=True) if err else None, + ) class TestCaptureManager: @@ -866,9 +878,8 @@ class TestFDCapture: cap = capture.FDCapture(fd) data = b"hello" os.write(fd, data) - s = cap.snap() + pytest.raises(AssertionError, cap.snap) cap.done() - assert not s cap = capture.FDCapture(fd) cap.start() os.write(fd, data) @@ -889,7 +900,7 @@ class TestFDCapture: fd = tmpfile.fileno() cap = capture.FDCapture(fd) cap.done() - pytest.raises(ValueError, cap.start) + pytest.raises(AssertionError, cap.start) def test_stderr(self): cap = capture.FDCapture(2) @@ -940,7 +951,7 @@ class TestFDCapture: assert s == "but now yes\n" cap.suspend() cap.done() - pytest.raises(AttributeError, cap.suspend) + pytest.raises(AssertionError, cap.suspend) assert repr(cap) == ( "".format( @@ -1142,6 +1153,7 @@ class TestStdCaptureFD(TestStdCapture): with lsof_check(): for i in range(10): cap = StdCaptureFD() + cap.start_capturing() cap.stop_capturing() @@ -1154,12 +1166,16 @@ class TestStdCaptureFDinvalidFD: from _pytest import capture def StdCaptureFD(out=True, err=True, in_=True): - return capture.MultiCapture(out, err, in_, Capture=capture.FDCapture) + return capture.MultiCapture( + in_=capture.FDCapture(0) if in_ else None, + out=capture.FDCapture(1) if out else None, + err=capture.FDCapture(2) if err else None, + ) def test_stdout(): os.close(1) cap = StdCaptureFD(out=True, err=False, in_=False) - assert fnmatch(repr(cap.out), "") + assert fnmatch(repr(cap.out), "") cap.start_capturing() os.write(1, b"stdout") assert cap.readouterr() == ("stdout", "") @@ -1168,7 +1184,7 @@ class TestStdCaptureFDinvalidFD: def test_stderr(): os.close(2) cap = StdCaptureFD(out=False, err=True, in_=False) - assert fnmatch(repr(cap.err), "") + assert fnmatch(repr(cap.err), "") cap.start_capturing() os.write(2, b"stderr") assert cap.readouterr() == ("", "stderr") @@ -1177,7 +1193,7 @@ class TestStdCaptureFDinvalidFD: def test_stdin(): os.close(0) cap = StdCaptureFD(out=False, err=False, in_=True) - assert fnmatch(repr(cap.in_), "") + assert fnmatch(repr(cap.in_), "") cap.stop_capturing() """ ) @@ -1239,11 +1255,8 @@ def test_capsys_results_accessible_by_attribute(capsys): assert capture_result.err == "eggs" -@pytest.mark.parametrize("use", [True, False]) -def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): - if not use: - tmpfile = True - cap = StdCaptureFD(out=False, err=tmpfile) +def test_fdcapture_tmpfile_remains_the_same() -> None: + cap = StdCaptureFD(out=False, err=True) try: cap.start_capturing() capfile = cap.err.tmpfile @@ -1276,16 +1289,21 @@ def test_close_and_capture_again(testdir): ) -@pytest.mark.parametrize("method", ["SysCapture", "FDCapture", "TeeSysCapture"]) -def test_capturing_and_logging_fundamentals(testdir, method): +@pytest.mark.parametrize( + "method", ["SysCapture(2)", "SysCapture(2, tee=True)", "FDCapture(2)"] +) +def test_capturing_and_logging_fundamentals(testdir, method: str) -> None: # here we check a fundamental feature p = testdir.makepyfile( """ import sys, os import py, logging from _pytest import capture - cap = capture.MultiCapture(out=False, in_=False, - Capture=capture.%s) + cap = capture.MultiCapture( + in_=None, + out=None, + err=capture.%s, + ) cap.start_capturing() logging.warning("hello1")