Refactored implementation and updated tests.

This commit is contained in:
Victor 2018-08-17 13:41:26 +02:00
parent 3059bfb1b3
commit 090f67a980
3 changed files with 33 additions and 34 deletions

View File

@ -122,23 +122,34 @@ class CaptureManager(object):
cap.suspend_capturing(in_=in_) cap.suspend_capturing(in_=in_)
return outerr return outerr
def activate_fixture(self, item=None): @contextlib.contextmanager
def disabled(self):
"""Temporarily disables capture while inside the 'with' block."""
if self._current_item is None:
yield
else:
item = self._current_item
fixture = getattr(item, "_capture_fixture", None)
if fixture is None:
yield
else:
fixture._capture.suspend_capturing()
self.suspend_global_capture(item=None, in_=False)
try:
yield
finally:
self.resume_global_capture()
fixture._capture.resume_capturing()
def activate_fixture(self, item):
"""If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
the global capture. the global capture.
""" """
if item is None:
if self._current_item is None:
return
item = self._current_item
fixture = getattr(item, "_capture_fixture", None) fixture = getattr(item, "_capture_fixture", None)
if fixture is not None: if fixture is not None:
fixture._start() fixture._start()
def deactivate_fixture(self, item=None): def deactivate_fixture(self, item):
if item is None:
if self._current_item is None:
return
item = self._current_item
"""Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any."""
fixture = getattr(item, "_capture_fixture", None) fixture = getattr(item, "_capture_fixture", None)
if fixture is not None: if fixture is not None:

View File

@ -573,9 +573,11 @@ class _LiveLoggingStreamHandler(logging.StreamHandler):
def emit(self, record): def emit(self, record):
if self.capture_manager is not None: if self.capture_manager is not None:
self.capture_manager.deactivate_fixture() ctx_manager = self.capture_manager.disabled()
self.capture_manager.suspend_global_capture() else:
try: ctx_manager = _dummy_context_manager()
with ctx_manager:
if not self._first_record_emitted: if not self._first_record_emitted:
self.stream.write("\n") self.stream.write("\n")
self._first_record_emitted = True self._first_record_emitted = True
@ -587,7 +589,3 @@ class _LiveLoggingStreamHandler(logging.StreamHandler):
self.stream.section("live log " + self._when, sep="-", bold=True) self.stream.section("live log " + self._when, sep="-", bold=True)
self._section_name_shown = True self._section_name_shown = True
logging.StreamHandler.emit(self, record) logging.StreamHandler.emit(self, record)
finally:
if self.capture_manager is not None:
self.capture_manager.resume_global_capture()
self.capture_manager.activate_fixture()

View File

@ -876,6 +876,7 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
is installed. is installed.
""" """
import logging import logging
import contextlib
from functools import partial from functools import partial
from _pytest.capture import CaptureManager from _pytest.capture import CaptureManager
from _pytest.logging import _LiveLoggingStreamHandler from _pytest.logging import _LiveLoggingStreamHandler
@ -883,17 +884,11 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
class MockCaptureManager: class MockCaptureManager:
calls = [] calls = []
def suspend_global_capture(self): @contextlib.contextmanager
self.calls.append("suspend_global_capture") def disabled(self):
self.calls.append("enter disabled")
def resume_global_capture(self): yield
self.calls.append("resume_global_capture") self.calls.append("exit disabled")
def activate_fixture(self, item=None):
self.calls.append("activate_fixture")
def deactivate_fixture(self, item=None):
self.calls.append("deactivate_fixture")
# sanity check # sanity check
assert CaptureManager.suspend_capture_item assert CaptureManager.suspend_capture_item
@ -914,12 +909,7 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
logger.critical("some message") logger.critical("some message")
if has_capture_manager: if has_capture_manager:
assert MockCaptureManager.calls == [ assert MockCaptureManager.calls == ["enter disabled", "exit disabled"]
"deactivate_fixture",
"suspend_global_capture",
"resume_global_capture",
"activate_fixture",
]
else: else:
assert MockCaptureManager.calls == [] assert MockCaptureManager.calls == []
assert out_file.getvalue() == "\nsome message\n" assert out_file.getvalue() == "\nsome message\n"