Merge pull request #3822 from Sup3rGeo/bugfix/capsys-with-cli-logging
Bugfix/capsys with cli logging
This commit is contained in:
commit
28aff051ab
1
AUTHORS
1
AUTHORS
|
@ -206,6 +206,7 @@ Trevor Bekolay
|
||||||
Tyler Goodlet
|
Tyler Goodlet
|
||||||
Tzu-ping Chung
|
Tzu-ping Chung
|
||||||
Vasily Kuznetsov
|
Vasily Kuznetsov
|
||||||
|
Victor Maryama
|
||||||
Victor Uriarte
|
Victor Uriarte
|
||||||
Vidar T. Fauske
|
Vidar T. Fauske
|
||||||
Vitaly Lashmanov
|
Vitaly Lashmanov
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix ``stdout/stderr`` not getting captured when real-time cli logging is active.
|
|
@ -14,8 +14,7 @@ from tempfile import TemporaryFile
|
||||||
|
|
||||||
import six
|
import six
|
||||||
import pytest
|
import pytest
|
||||||
from _pytest.compat import CaptureIO
|
from _pytest.compat import CaptureIO, dummy_context_manager
|
||||||
|
|
||||||
|
|
||||||
patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
|
patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
|
||||||
|
|
||||||
|
@ -85,6 +84,7 @@ class CaptureManager(object):
|
||||||
def __init__(self, method):
|
def __init__(self, method):
|
||||||
self._method = method
|
self._method = method
|
||||||
self._global_capturing = None
|
self._global_capturing = None
|
||||||
|
self._current_item = None
|
||||||
|
|
||||||
def _getcapture(self, method):
|
def _getcapture(self, method):
|
||||||
if method == "fd":
|
if method == "fd":
|
||||||
|
@ -121,6 +121,19 @@ class CaptureManager(object):
|
||||||
cap.suspend_capturing(in_=in_)
|
cap.suspend_capturing(in_=in_)
|
||||||
return outerr
|
return outerr
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def global_and_fixture_disabled(self):
|
||||||
|
"""Context manager to temporarily disables global and current fixture capturing."""
|
||||||
|
# Need to undo local capsys-et-al if exists before disabling global capture
|
||||||
|
fixture = getattr(self._current_item, "_capture_fixture", None)
|
||||||
|
ctx_manager = fixture._suspend() if fixture else dummy_context_manager()
|
||||||
|
with ctx_manager:
|
||||||
|
self.suspend_global_capture(item=None, in_=False)
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
self.resume_global_capture()
|
||||||
|
|
||||||
def activate_fixture(self, item):
|
def activate_fixture(self, item):
|
||||||
"""If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
|
"""If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
|
||||||
the global capture.
|
the global capture.
|
||||||
|
@ -151,28 +164,34 @@ class CaptureManager(object):
|
||||||
|
|
||||||
@pytest.hookimpl(hookwrapper=True)
|
@pytest.hookimpl(hookwrapper=True)
|
||||||
def pytest_runtest_setup(self, item):
|
def pytest_runtest_setup(self, item):
|
||||||
|
self._current_item = item
|
||||||
self.resume_global_capture()
|
self.resume_global_capture()
|
||||||
# no need to activate a capture fixture because they activate themselves during creation; this
|
# no need to activate a capture fixture because they activate themselves during creation; this
|
||||||
# only makes sense when a fixture uses a capture fixture, otherwise the capture fixture will
|
# only makes sense when a fixture uses a capture fixture, otherwise the capture fixture will
|
||||||
# be activated during pytest_runtest_call
|
# be activated during pytest_runtest_call
|
||||||
yield
|
yield
|
||||||
self.suspend_capture_item(item, "setup")
|
self.suspend_capture_item(item, "setup")
|
||||||
|
self._current_item = None
|
||||||
|
|
||||||
@pytest.hookimpl(hookwrapper=True)
|
@pytest.hookimpl(hookwrapper=True)
|
||||||
def pytest_runtest_call(self, item):
|
def pytest_runtest_call(self, item):
|
||||||
|
self._current_item = item
|
||||||
self.resume_global_capture()
|
self.resume_global_capture()
|
||||||
# it is important to activate this fixture during the call phase so it overwrites the "global"
|
# it is important to activate this fixture during the call phase so it overwrites the "global"
|
||||||
# capture
|
# capture
|
||||||
self.activate_fixture(item)
|
self.activate_fixture(item)
|
||||||
yield
|
yield
|
||||||
self.suspend_capture_item(item, "call")
|
self.suspend_capture_item(item, "call")
|
||||||
|
self._current_item = None
|
||||||
|
|
||||||
@pytest.hookimpl(hookwrapper=True)
|
@pytest.hookimpl(hookwrapper=True)
|
||||||
def pytest_runtest_teardown(self, item):
|
def pytest_runtest_teardown(self, item):
|
||||||
|
self._current_item = item
|
||||||
self.resume_global_capture()
|
self.resume_global_capture()
|
||||||
self.activate_fixture(item)
|
self.activate_fixture(item)
|
||||||
yield
|
yield
|
||||||
self.suspend_capture_item(item, "teardown")
|
self.suspend_capture_item(item, "teardown")
|
||||||
|
self._current_item = None
|
||||||
|
|
||||||
@pytest.hookimpl(tryfirst=True)
|
@pytest.hookimpl(tryfirst=True)
|
||||||
def pytest_keyboard_interrupt(self, excinfo):
|
def pytest_keyboard_interrupt(self, excinfo):
|
||||||
|
@ -314,17 +333,21 @@ class CaptureFixture(object):
|
||||||
return self._outerr
|
return self._outerr
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def disabled(self):
|
def _suspend(self):
|
||||||
"""Temporarily disables capture while inside the 'with' block."""
|
"""Suspends this fixture's own capturing temporarily."""
|
||||||
self._capture.suspend_capturing()
|
self._capture.suspend_capturing()
|
||||||
capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
|
|
||||||
capmanager.suspend_global_capture(item=None, in_=False)
|
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
capmanager.resume_global_capture()
|
|
||||||
self._capture.resume_capturing()
|
self._capture.resume_capturing()
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def disabled(self):
|
||||||
|
"""Temporarily disables capture while inside the 'with' block."""
|
||||||
|
capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
|
||||||
|
with capmanager.global_and_fixture_disabled():
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
|
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
|
||||||
""" return an open text file object that's a duplicate of f on the
|
""" return an open text file object that's a duplicate of f on the
|
||||||
|
|
|
@ -8,6 +8,7 @@ import functools
|
||||||
import inspect
|
import inspect
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
import py
|
import py
|
||||||
|
|
||||||
|
@ -151,6 +152,13 @@ def getfuncargnames(function, is_method=False, cls=None):
|
||||||
return arg_names
|
return arg_names
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def dummy_context_manager():
|
||||||
|
"""Context manager that does nothing, useful in situations where you might need an actual context manager or not
|
||||||
|
depending on some condition. Using this allow to keep the same code"""
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
def get_default_arg_names(function):
|
def get_default_arg_names(function):
|
||||||
# Note: this code intentionally mirrors the code at the beginning of getfuncargnames,
|
# Note: this code intentionally mirrors the code at the beginning of getfuncargnames,
|
||||||
# to get the arguments which were excluded from its result because they had default values
|
# to get the arguments which were excluded from its result because they had default values
|
||||||
|
|
|
@ -6,6 +6,7 @@ from contextlib import closing, contextmanager
|
||||||
import re
|
import re
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from _pytest.compat import dummy_context_manager
|
||||||
from _pytest.config import create_terminal_writer
|
from _pytest.config import create_terminal_writer
|
||||||
import pytest
|
import pytest
|
||||||
import py
|
import py
|
||||||
|
@ -369,11 +370,6 @@ def pytest_configure(config):
|
||||||
config.pluginmanager.register(LoggingPlugin(config), "logging-plugin")
|
config.pluginmanager.register(LoggingPlugin(config), "logging-plugin")
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def _dummy_context_manager():
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
class LoggingPlugin(object):
|
class LoggingPlugin(object):
|
||||||
"""Attaches to the logging module and captures log messages for each test.
|
"""Attaches to the logging module and captures log messages for each test.
|
||||||
"""
|
"""
|
||||||
|
@ -537,7 +533,7 @@ class LoggingPlugin(object):
|
||||||
log_cli_handler, formatter=log_cli_formatter, level=log_cli_level
|
log_cli_handler, formatter=log_cli_formatter, level=log_cli_level
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self.live_logs_context = _dummy_context_manager()
|
self.live_logs_context = dummy_context_manager()
|
||||||
|
|
||||||
|
|
||||||
class _LiveLoggingStreamHandler(logging.StreamHandler):
|
class _LiveLoggingStreamHandler(logging.StreamHandler):
|
||||||
|
@ -572,9 +568,12 @@ class _LiveLoggingStreamHandler(logging.StreamHandler):
|
||||||
self._test_outcome_written = False
|
self._test_outcome_written = False
|
||||||
|
|
||||||
def emit(self, record):
|
def emit(self, record):
|
||||||
if self.capture_manager is not None:
|
ctx_manager = (
|
||||||
self.capture_manager.suspend_global_capture()
|
self.capture_manager.global_and_fixture_disabled()
|
||||||
try:
|
if self.capture_manager
|
||||||
|
else dummy_context_manager()
|
||||||
|
)
|
||||||
|
with ctx_manager:
|
||||||
if not self._first_record_emitted:
|
if not self._first_record_emitted:
|
||||||
self.stream.write("\n")
|
self.stream.write("\n")
|
||||||
self._first_record_emitted = True
|
self._first_record_emitted = True
|
||||||
|
@ -586,6 +585,3 @@ class _LiveLoggingStreamHandler(logging.StreamHandler):
|
||||||
self.stream.section("live log " + self._when, sep="-", bold=True)
|
self.stream.section("live log " + self._when, sep="-", bold=True)
|
||||||
self._section_name_shown = True
|
self._section_name_shown = True
|
||||||
logging.StreamHandler.emit(self, record)
|
logging.StreamHandler.emit(self, record)
|
||||||
finally:
|
|
||||||
if self.capture_manager is not None:
|
|
||||||
self.capture_manager.resume_global_capture()
|
|
||||||
|
|
|
@ -876,6 +876,7 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
|
||||||
is installed.
|
is installed.
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
import contextlib
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from _pytest.capture import CaptureManager
|
from _pytest.capture import CaptureManager
|
||||||
from _pytest.logging import _LiveLoggingStreamHandler
|
from _pytest.logging import _LiveLoggingStreamHandler
|
||||||
|
@ -883,11 +884,11 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
|
||||||
class MockCaptureManager:
|
class MockCaptureManager:
|
||||||
calls = []
|
calls = []
|
||||||
|
|
||||||
def suspend_global_capture(self):
|
@contextlib.contextmanager
|
||||||
self.calls.append("suspend_global_capture")
|
def global_and_fixture_disabled(self):
|
||||||
|
self.calls.append("enter disabled")
|
||||||
def resume_global_capture(self):
|
yield
|
||||||
self.calls.append("resume_global_capture")
|
self.calls.append("exit disabled")
|
||||||
|
|
||||||
# sanity check
|
# sanity check
|
||||||
assert CaptureManager.suspend_capture_item
|
assert CaptureManager.suspend_capture_item
|
||||||
|
@ -908,10 +909,7 @@ def test_live_logging_suspends_capture(has_capture_manager, request):
|
||||||
|
|
||||||
logger.critical("some message")
|
logger.critical("some message")
|
||||||
if has_capture_manager:
|
if has_capture_manager:
|
||||||
assert MockCaptureManager.calls == [
|
assert MockCaptureManager.calls == ["enter disabled", "exit disabled"]
|
||||||
"suspend_global_capture",
|
|
||||||
"resume_global_capture",
|
|
||||||
]
|
|
||||||
else:
|
else:
|
||||||
assert MockCaptureManager.calls == []
|
assert MockCaptureManager.calls == []
|
||||||
assert out_file.getvalue() == "\nsome message\n"
|
assert out_file.getvalue() == "\nsome message\n"
|
||||||
|
|
|
@ -1385,3 +1385,34 @@ def test_pickling_and_unpickling_encoded_file():
|
||||||
ef = capture.EncodedFile(None, None)
|
ef = capture.EncodedFile(None, None)
|
||||||
ef_as_str = pickle.dumps(ef)
|
ef_as_str = pickle.dumps(ef)
|
||||||
pickle.loads(ef_as_str)
|
pickle.loads(ef_as_str)
|
||||||
|
|
||||||
|
|
||||||
|
def test_capsys_with_cli_logging(testdir):
|
||||||
|
# Issue 3819
|
||||||
|
# capsys should work with real-time cli logging
|
||||||
|
testdir.makepyfile(
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def test_myoutput(capsys): # or use "capfd" for fd-level
|
||||||
|
print("hello")
|
||||||
|
sys.stderr.write("world\\n")
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert captured.out == "hello\\n"
|
||||||
|
assert captured.err == "world\\n"
|
||||||
|
|
||||||
|
logging.info("something")
|
||||||
|
|
||||||
|
print("next")
|
||||||
|
|
||||||
|
logging.info("something")
|
||||||
|
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert captured.out == "next\\n"
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
result = testdir.runpytest_subprocess("--log-cli-level=INFO")
|
||||||
|
assert result.ret == 0
|
||||||
|
|
Loading…
Reference in New Issue