Merge pull request #3832 from Sup3rGeo/bugfix/capsys-with-cli-logging
Bugfix/capsys with cli logging (again)
This commit is contained in:
		
						commit
						f1079a8222
					
				| 
						 | 
					@ -14,7 +14,7 @@ from tempfile import TemporaryFile
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import six
 | 
					import six
 | 
				
			||||||
import pytest
 | 
					import pytest
 | 
				
			||||||
from _pytest.compat import CaptureIO, dummy_context_manager
 | 
					from _pytest.compat import CaptureIO
 | 
				
			||||||
 | 
					
 | 
				
			||||||
patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
 | 
					patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -62,8 +62,9 @@ def pytest_load_initial_conftests(early_config, parser, args):
 | 
				
			||||||
    # finally trigger conftest loading but while capturing (issue93)
 | 
					    # finally trigger conftest loading but while capturing (issue93)
 | 
				
			||||||
    capman.start_global_capturing()
 | 
					    capman.start_global_capturing()
 | 
				
			||||||
    outcome = yield
 | 
					    outcome = yield
 | 
				
			||||||
    out, err = capman.suspend_global_capture()
 | 
					    capman.suspend_global_capture()
 | 
				
			||||||
    if outcome.excinfo is not None:
 | 
					    if outcome.excinfo is not None:
 | 
				
			||||||
 | 
					        out, err = capman.read_global_capture()
 | 
				
			||||||
        sys.stdout.write(out)
 | 
					        sys.stdout.write(out)
 | 
				
			||||||
        sys.stderr.write(err)
 | 
					        sys.stderr.write(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -96,6 +97,8 @@ class CaptureManager(object):
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            raise ValueError("unknown capturing method: %r" % method)
 | 
					            raise ValueError("unknown capturing method: %r" % method)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Global capturing control
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def start_global_capturing(self):
 | 
					    def start_global_capturing(self):
 | 
				
			||||||
        assert self._global_capturing is None
 | 
					        assert self._global_capturing is None
 | 
				
			||||||
        self._global_capturing = self._getcapture(self._method)
 | 
					        self._global_capturing = self._getcapture(self._method)
 | 
				
			||||||
| 
						 | 
					@ -110,29 +113,15 @@ class CaptureManager(object):
 | 
				
			||||||
    def resume_global_capture(self):
 | 
					    def resume_global_capture(self):
 | 
				
			||||||
        self._global_capturing.resume_capturing()
 | 
					        self._global_capturing.resume_capturing()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def suspend_global_capture(self, item=None, in_=False):
 | 
					    def suspend_global_capture(self, in_=False):
 | 
				
			||||||
        if item is not None:
 | 
					 | 
				
			||||||
            self.deactivate_fixture(item)
 | 
					 | 
				
			||||||
        cap = getattr(self, "_global_capturing", None)
 | 
					        cap = getattr(self, "_global_capturing", None)
 | 
				
			||||||
        if cap is not None:
 | 
					        if cap is not None:
 | 
				
			||||||
            try:
 | 
					            cap.suspend_capturing(in_=in_)
 | 
				
			||||||
                outerr = cap.readouterr()
 | 
					 | 
				
			||||||
            finally:
 | 
					 | 
				
			||||||
                cap.suspend_capturing(in_=in_)
 | 
					 | 
				
			||||||
            return outerr
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @contextlib.contextmanager
 | 
					    def read_global_capture(self):
 | 
				
			||||||
    def global_and_fixture_disabled(self):
 | 
					        return self._global_capturing.readouterr()
 | 
				
			||||||
        """Context manager to temporarily disables global and current fixture capturing."""
 | 
					
 | 
				
			||||||
        # Need to undo local capsys-et-al if exists before disabling global capture
 | 
					    # Fixture Control (its just forwarding, think about removing this later)
 | 
				
			||||||
        fixture = getattr(self._current_item, "_capture_fixture", None)
 | 
					 | 
				
			||||||
        ctx_manager = fixture._suspend() if fixture else dummy_context_manager()
 | 
					 | 
				
			||||||
        with ctx_manager:
 | 
					 | 
				
			||||||
            self.suspend_global_capture(item=None, in_=False)
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                yield
 | 
					 | 
				
			||||||
            finally:
 | 
					 | 
				
			||||||
                self.resume_global_capture()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def activate_fixture(self, item):
 | 
					    def activate_fixture(self, item):
 | 
				
			||||||
        """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
 | 
					        """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
 | 
				
			||||||
| 
						 | 
					@ -148,12 +137,53 @@ class CaptureManager(object):
 | 
				
			||||||
        if fixture is not None:
 | 
					        if fixture is not None:
 | 
				
			||||||
            fixture.close()
 | 
					            fixture.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def suspend_fixture(self, item):
 | 
				
			||||||
 | 
					        fixture = getattr(item, "_capture_fixture", None)
 | 
				
			||||||
 | 
					        if fixture is not None:
 | 
				
			||||||
 | 
					            fixture._suspend()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def resume_fixture(self, item):
 | 
				
			||||||
 | 
					        fixture = getattr(item, "_capture_fixture", None)
 | 
				
			||||||
 | 
					        if fixture is not None:
 | 
				
			||||||
 | 
					            fixture._resume()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Helper context managers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @contextlib.contextmanager
 | 
				
			||||||
 | 
					    def global_and_fixture_disabled(self):
 | 
				
			||||||
 | 
					        """Context manager to temporarily disables global and current fixture capturing."""
 | 
				
			||||||
 | 
					        # Need to undo local capsys-et-al if exists before disabling global capture
 | 
				
			||||||
 | 
					        self.suspend_fixture(self._current_item)
 | 
				
			||||||
 | 
					        self.suspend_global_capture(in_=False)
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            yield
 | 
				
			||||||
 | 
					        finally:
 | 
				
			||||||
 | 
					            self.resume_global_capture()
 | 
				
			||||||
 | 
					            self.resume_fixture(self._current_item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @contextlib.contextmanager
 | 
				
			||||||
 | 
					    def item_capture(self, when, item):
 | 
				
			||||||
 | 
					        self.resume_global_capture()
 | 
				
			||||||
 | 
					        self.activate_fixture(item)
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            yield
 | 
				
			||||||
 | 
					        finally:
 | 
				
			||||||
 | 
					            self.deactivate_fixture(item)
 | 
				
			||||||
 | 
					            self.suspend_global_capture(in_=False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        out, err = self.read_global_capture()
 | 
				
			||||||
 | 
					        item.add_report_section(when, "stdout", out)
 | 
				
			||||||
 | 
					        item.add_report_section(when, "stderr", err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Hooks
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @pytest.hookimpl(hookwrapper=True)
 | 
					    @pytest.hookimpl(hookwrapper=True)
 | 
				
			||||||
    def pytest_make_collect_report(self, collector):
 | 
					    def pytest_make_collect_report(self, collector):
 | 
				
			||||||
        if isinstance(collector, pytest.File):
 | 
					        if isinstance(collector, pytest.File):
 | 
				
			||||||
            self.resume_global_capture()
 | 
					            self.resume_global_capture()
 | 
				
			||||||
            outcome = yield
 | 
					            outcome = yield
 | 
				
			||||||
            out, err = self.suspend_global_capture()
 | 
					            self.suspend_global_capture()
 | 
				
			||||||
 | 
					            out, err = self.read_global_capture()
 | 
				
			||||||
            rep = outcome.get_result()
 | 
					            rep = outcome.get_result()
 | 
				
			||||||
            if out:
 | 
					            if out:
 | 
				
			||||||
                rep.sections.append(("Captured stdout", out))
 | 
					                rep.sections.append(("Captured stdout", out))
 | 
				
			||||||
| 
						 | 
					@ -163,35 +193,25 @@ class CaptureManager(object):
 | 
				
			||||||
            yield
 | 
					            yield
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @pytest.hookimpl(hookwrapper=True)
 | 
					    @pytest.hookimpl(hookwrapper=True)
 | 
				
			||||||
    def pytest_runtest_setup(self, item):
 | 
					    def pytest_runtest_protocol(self, item):
 | 
				
			||||||
        self._current_item = item
 | 
					        self._current_item = item
 | 
				
			||||||
        self.resume_global_capture()
 | 
					 | 
				
			||||||
        # no need to activate a capture fixture because they activate themselves during creation; this
 | 
					 | 
				
			||||||
        # only makes sense when a fixture uses a capture fixture, otherwise the capture fixture will
 | 
					 | 
				
			||||||
        # be activated during pytest_runtest_call
 | 
					 | 
				
			||||||
        yield
 | 
					        yield
 | 
				
			||||||
        self.suspend_capture_item(item, "setup")
 | 
					 | 
				
			||||||
        self._current_item = None
 | 
					        self._current_item = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @pytest.hookimpl(hookwrapper=True)
 | 
				
			||||||
 | 
					    def pytest_runtest_setup(self, item):
 | 
				
			||||||
 | 
					        with self.item_capture("setup", item):
 | 
				
			||||||
 | 
					            yield
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @pytest.hookimpl(hookwrapper=True)
 | 
					    @pytest.hookimpl(hookwrapper=True)
 | 
				
			||||||
    def pytest_runtest_call(self, item):
 | 
					    def pytest_runtest_call(self, item):
 | 
				
			||||||
        self._current_item = item
 | 
					        with self.item_capture("call", item):
 | 
				
			||||||
        self.resume_global_capture()
 | 
					            yield
 | 
				
			||||||
        # it is important to activate this fixture during the call phase so it overwrites the "global"
 | 
					 | 
				
			||||||
        # capture
 | 
					 | 
				
			||||||
        self.activate_fixture(item)
 | 
					 | 
				
			||||||
        yield
 | 
					 | 
				
			||||||
        self.suspend_capture_item(item, "call")
 | 
					 | 
				
			||||||
        self._current_item = None
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @pytest.hookimpl(hookwrapper=True)
 | 
					    @pytest.hookimpl(hookwrapper=True)
 | 
				
			||||||
    def pytest_runtest_teardown(self, item):
 | 
					    def pytest_runtest_teardown(self, item):
 | 
				
			||||||
        self._current_item = item
 | 
					        with self.item_capture("teardown", item):
 | 
				
			||||||
        self.resume_global_capture()
 | 
					            yield
 | 
				
			||||||
        self.activate_fixture(item)
 | 
					 | 
				
			||||||
        yield
 | 
					 | 
				
			||||||
        self.suspend_capture_item(item, "teardown")
 | 
					 | 
				
			||||||
        self._current_item = None
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @pytest.hookimpl(tryfirst=True)
 | 
					    @pytest.hookimpl(tryfirst=True)
 | 
				
			||||||
    def pytest_keyboard_interrupt(self, excinfo):
 | 
					    def pytest_keyboard_interrupt(self, excinfo):
 | 
				
			||||||
| 
						 | 
					@ -201,11 +221,6 @@ class CaptureManager(object):
 | 
				
			||||||
    def pytest_internalerror(self, excinfo):
 | 
					    def pytest_internalerror(self, excinfo):
 | 
				
			||||||
        self.stop_global_capturing()
 | 
					        self.stop_global_capturing()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def suspend_capture_item(self, item, when, in_=False):
 | 
					 | 
				
			||||||
        out, err = self.suspend_global_capture(item, in_=in_)
 | 
					 | 
				
			||||||
        item.add_report_section(when, "stdout", out)
 | 
					 | 
				
			||||||
        item.add_report_section(when, "stderr", err)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"}
 | 
					capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -314,10 +329,12 @@ class CaptureFixture(object):
 | 
				
			||||||
        self._captured_err = self.captureclass.EMPTY_BUFFER
 | 
					        self._captured_err = self.captureclass.EMPTY_BUFFER
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _start(self):
 | 
					    def _start(self):
 | 
				
			||||||
        self._capture = MultiCapture(
 | 
					        # Start if not started yet
 | 
				
			||||||
            out=True, err=True, in_=False, Capture=self.captureclass
 | 
					        if getattr(self, "_capture", None) is None:
 | 
				
			||||||
        )
 | 
					            self._capture = MultiCapture(
 | 
				
			||||||
        self._capture.start_capturing()
 | 
					                out=True, err=True, in_=False, Capture=self.captureclass
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            self._capture.start_capturing()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def close(self):
 | 
					    def close(self):
 | 
				
			||||||
        if self._capture is not None:
 | 
					        if self._capture is not None:
 | 
				
			||||||
| 
						 | 
					@ -341,14 +358,13 @@ class CaptureFixture(object):
 | 
				
			||||||
        self._captured_err = self.captureclass.EMPTY_BUFFER
 | 
					        self._captured_err = self.captureclass.EMPTY_BUFFER
 | 
				
			||||||
        return CaptureResult(captured_out, captured_err)
 | 
					        return CaptureResult(captured_out, captured_err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @contextlib.contextmanager
 | 
					 | 
				
			||||||
    def _suspend(self):
 | 
					    def _suspend(self):
 | 
				
			||||||
        """Suspends this fixture's own capturing temporarily."""
 | 
					        """Suspends this fixture's own capturing temporarily."""
 | 
				
			||||||
        self._capture.suspend_capturing()
 | 
					        self._capture.suspend_capturing()
 | 
				
			||||||
        try:
 | 
					
 | 
				
			||||||
            yield
 | 
					    def _resume(self):
 | 
				
			||||||
        finally:
 | 
					        """Resumes this fixture's own capturing temporarily."""
 | 
				
			||||||
            self._capture.resume_capturing()
 | 
					        self._capture.resume_capturing()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @contextlib.contextmanager
 | 
					    @contextlib.contextmanager
 | 
				
			||||||
    def disabled(self):
 | 
					    def disabled(self):
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -102,7 +102,8 @@ class PdbInvoke(object):
 | 
				
			||||||
    def pytest_exception_interact(self, node, call, report):
 | 
					    def pytest_exception_interact(self, node, call, report):
 | 
				
			||||||
        capman = node.config.pluginmanager.getplugin("capturemanager")
 | 
					        capman = node.config.pluginmanager.getplugin("capturemanager")
 | 
				
			||||||
        if capman:
 | 
					        if capman:
 | 
				
			||||||
            out, err = capman.suspend_global_capture(in_=True)
 | 
					            capman.suspend_global_capture(in_=True)
 | 
				
			||||||
 | 
					            out, err = capman.read_global_capture()
 | 
				
			||||||
            sys.stdout.write(out)
 | 
					            sys.stdout.write(out)
 | 
				
			||||||
            sys.stdout.write(err)
 | 
					            sys.stdout.write(err)
 | 
				
			||||||
        _enter_pdb(node, call.excinfo, report)
 | 
					        _enter_pdb(node, call.excinfo, report)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -51,7 +51,8 @@ def _show_fixture_action(fixturedef, msg):
 | 
				
			||||||
    config = fixturedef._fixturemanager.config
 | 
					    config = fixturedef._fixturemanager.config
 | 
				
			||||||
    capman = config.pluginmanager.getplugin("capturemanager")
 | 
					    capman = config.pluginmanager.getplugin("capturemanager")
 | 
				
			||||||
    if capman:
 | 
					    if capman:
 | 
				
			||||||
        out, err = capman.suspend_global_capture()
 | 
					        capman.suspend_global_capture()
 | 
				
			||||||
 | 
					        out, err = capman.read_global_capture()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    tw = config.get_terminal_writer()
 | 
					    tw = config.get_terminal_writer()
 | 
				
			||||||
    tw.line()
 | 
					    tw.line()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -878,7 +878,6 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
 | 
				
			||||||
    import logging
 | 
					    import logging
 | 
				
			||||||
    import contextlib
 | 
					    import contextlib
 | 
				
			||||||
    from functools import partial
 | 
					    from functools import partial
 | 
				
			||||||
    from _pytest.capture import CaptureManager
 | 
					 | 
				
			||||||
    from _pytest.logging import _LiveLoggingStreamHandler
 | 
					    from _pytest.logging import _LiveLoggingStreamHandler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    class MockCaptureManager:
 | 
					    class MockCaptureManager:
 | 
				
			||||||
| 
						 | 
					@ -890,10 +889,6 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
 | 
				
			||||||
            yield
 | 
					            yield
 | 
				
			||||||
            self.calls.append("exit disabled")
 | 
					            self.calls.append("exit disabled")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # sanity check
 | 
					 | 
				
			||||||
    assert CaptureManager.suspend_capture_item
 | 
					 | 
				
			||||||
    assert CaptureManager.resume_global_capture
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    class DummyTerminal(six.StringIO):
 | 
					    class DummyTerminal(six.StringIO):
 | 
				
			||||||
        def section(self, *args, **kwargs):
 | 
					        def section(self, *args, **kwargs):
 | 
				
			||||||
            pass
 | 
					            pass
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -70,19 +70,23 @@ class TestCaptureManager(object):
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            capman = CaptureManager(method)
 | 
					            capman = CaptureManager(method)
 | 
				
			||||||
            capman.start_global_capturing()
 | 
					            capman.start_global_capturing()
 | 
				
			||||||
            outerr = capman.suspend_global_capture()
 | 
					            capman.suspend_global_capture()
 | 
				
			||||||
 | 
					            outerr = capman.read_global_capture()
 | 
				
			||||||
            assert outerr == ("", "")
 | 
					            assert outerr == ("", "")
 | 
				
			||||||
            outerr = capman.suspend_global_capture()
 | 
					            capman.suspend_global_capture()
 | 
				
			||||||
 | 
					            outerr = capman.read_global_capture()
 | 
				
			||||||
            assert outerr == ("", "")
 | 
					            assert outerr == ("", "")
 | 
				
			||||||
            print("hello")
 | 
					            print("hello")
 | 
				
			||||||
            out, err = capman.suspend_global_capture()
 | 
					            capman.suspend_global_capture()
 | 
				
			||||||
 | 
					            out, err = capman.read_global_capture()
 | 
				
			||||||
            if method == "no":
 | 
					            if method == "no":
 | 
				
			||||||
                assert old == (sys.stdout, sys.stderr, sys.stdin)
 | 
					                assert old == (sys.stdout, sys.stderr, sys.stdin)
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                assert not out
 | 
					                assert not out
 | 
				
			||||||
            capman.resume_global_capture()
 | 
					            capman.resume_global_capture()
 | 
				
			||||||
            print("hello")
 | 
					            print("hello")
 | 
				
			||||||
            out, err = capman.suspend_global_capture()
 | 
					            capman.suspend_global_capture()
 | 
				
			||||||
 | 
					            out, err = capman.read_global_capture()
 | 
				
			||||||
            if method != "no":
 | 
					            if method != "no":
 | 
				
			||||||
                assert out == "hello\n"
 | 
					                assert out == "hello\n"
 | 
				
			||||||
            capman.stop_global_capturing()
 | 
					            capman.stop_global_capturing()
 | 
				
			||||||
| 
						 | 
					@ -1415,9 +1419,69 @@ def test_pickling_and_unpickling_encoded_file():
 | 
				
			||||||
    pickle.loads(ef_as_str)
 | 
					    pickle.loads(ef_as_str)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def test_capsys_with_cli_logging(testdir):
 | 
					def test_global_capture_with_live_logging(testdir):
 | 
				
			||||||
    # Issue 3819
 | 
					    # Issue 3819
 | 
				
			||||||
    # capsys should work with real-time cli logging
 | 
					    # capture should work with live cli logging
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Teardown report seems to have the capture for the whole process (setup, capture, teardown)
 | 
				
			||||||
 | 
					    testdir.makeconftest(
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        def pytest_runtest_logreport(report):
 | 
				
			||||||
 | 
					            if "test_global" in report.nodeid:
 | 
				
			||||||
 | 
					                if report.when == "teardown":
 | 
				
			||||||
 | 
					                    with open("caplog", "w") as f:
 | 
				
			||||||
 | 
					                        f.write(report.caplog)
 | 
				
			||||||
 | 
					                    with open("capstdout", "w") as f:
 | 
				
			||||||
 | 
					                        f.write(report.capstdout)
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    testdir.makepyfile(
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        import logging
 | 
				
			||||||
 | 
					        import sys
 | 
				
			||||||
 | 
					        import pytest
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        @pytest.fixture
 | 
				
			||||||
 | 
					        def fix1():
 | 
				
			||||||
 | 
					            print("fix setup")
 | 
				
			||||||
 | 
					            logging.info("fix setup")
 | 
				
			||||||
 | 
					            yield
 | 
				
			||||||
 | 
					            logging.info("fix teardown")
 | 
				
			||||||
 | 
					            print("fix teardown")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        def test_global(fix1):
 | 
				
			||||||
 | 
					            print("begin test")
 | 
				
			||||||
 | 
					            logging.info("something in test")
 | 
				
			||||||
 | 
					            print("end test")
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    result = testdir.runpytest_subprocess("--log-cli-level=INFO")
 | 
				
			||||||
 | 
					    assert result.ret == 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    with open("caplog", "r") as f:
 | 
				
			||||||
 | 
					        caplog = f.read()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    assert "fix setup" in caplog
 | 
				
			||||||
 | 
					    assert "something in test" in caplog
 | 
				
			||||||
 | 
					    assert "fix teardown" in caplog
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    with open("capstdout", "r") as f:
 | 
				
			||||||
 | 
					        capstdout = f.read()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    assert "fix setup" in capstdout
 | 
				
			||||||
 | 
					    assert "begin test" in capstdout
 | 
				
			||||||
 | 
					    assert "end test" in capstdout
 | 
				
			||||||
 | 
					    assert "fix teardown" in capstdout
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@pytest.mark.parametrize("capture_fixture", ["capsys", "capfd"])
 | 
				
			||||||
 | 
					def test_capture_with_live_logging(testdir, capture_fixture):
 | 
				
			||||||
 | 
					    # Issue 3819
 | 
				
			||||||
 | 
					    # capture should work with live cli logging
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    testdir.makepyfile(
 | 
					    testdir.makepyfile(
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        import logging
 | 
					        import logging
 | 
				
			||||||
| 
						 | 
					@ -1425,22 +1489,23 @@ def test_capsys_with_cli_logging(testdir):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        logger = logging.getLogger(__name__)
 | 
					        logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        def test_myoutput(capsys):  # or use "capfd" for fd-level
 | 
					        def test_capture({0}):
 | 
				
			||||||
            print("hello")
 | 
					            print("hello")
 | 
				
			||||||
            sys.stderr.write("world\\n")
 | 
					            sys.stderr.write("world\\n")
 | 
				
			||||||
            captured = capsys.readouterr()
 | 
					            captured = {0}.readouterr()
 | 
				
			||||||
            assert captured.out == "hello\\n"
 | 
					            assert captured.out == "hello\\n"
 | 
				
			||||||
            assert captured.err == "world\\n"
 | 
					            assert captured.err == "world\\n"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            logging.info("something")
 | 
					            logging.info("something")
 | 
				
			||||||
 | 
					 | 
				
			||||||
            print("next")
 | 
					            print("next")
 | 
				
			||||||
 | 
					 | 
				
			||||||
            logging.info("something")
 | 
					            logging.info("something")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            captured = capsys.readouterr()
 | 
					            captured = {0}.readouterr()
 | 
				
			||||||
            assert captured.out == "next\\n"
 | 
					            assert captured.out == "next\\n"
 | 
				
			||||||
        """
 | 
					        """.format(
 | 
				
			||||||
 | 
					            capture_fixture
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    result = testdir.runpytest_subprocess("--log-cli-level=INFO")
 | 
					    result = testdir.runpytest_subprocess("--log-cli-level=INFO")
 | 
				
			||||||
    assert result.ret == 0
 | 
					    assert result.ret == 0
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue