This commit is contained in:
Tobias Deiminger 2023-05-23 13:58:15 -04:00 committed by GitHub
commit 2b6d000459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 15 deletions

View File

@ -8,11 +8,13 @@ from contextlib import nullcontext
from io import StringIO from io import StringIO
from pathlib import Path from pathlib import Path
from typing import AbstractSet from typing import AbstractSet
from typing import cast
from typing import Dict from typing import Dict
from typing import Generator from typing import Generator
from typing import List from typing import List
from typing import Mapping from typing import Mapping
from typing import Optional from typing import Optional
from typing import Set
from typing import Tuple from typing import Tuple
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import TypeVar from typing import TypeVar
@ -47,12 +49,99 @@ DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S"
_ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m") _ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m")
caplog_handler_key = StashKey["LogCaptureHandler"]() caplog_handler_key = StashKey["LogCaptureHandler"]()
caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]() caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]()
_HandlerType = TypeVar("_HandlerType", bound=logging.Handler)
def _remove_ansi_escape_sequences(text: str) -> str: def _remove_ansi_escape_sequences(text: str) -> str:
return _ANSI_ESCAPE_SEQ.sub("", text) return _ANSI_ESCAPE_SEQ.sub("", text)
class CatchedLoggers:
"""Add pytest handler to loggers on demand.
Usually it would be enough to simply add pytest handlers to the root logger at fixed stages and rely
on log propagation to capture logs from child loggers. However, if a user turns off propagation
for a child logger, we would miss their logs. The chin-ups here and in LoggerPropagateInterceptor
are mainly to capture child loggers with propagate = False, by instantly adding pytest handlers to
them.
"""
__slots__ = ("handler", "level", "loggers")
def __init__(
self,
initial_set: Set[logging.Logger],
handler: _HandlerType,
level: Optional[int] = None,
):
self.handler = handler
self.level = level
self.loggers: Dict[logging.Logger, Optional[int]] = {}
if self.level is not None:
self.handler.setLevel(self.level)
for logger in initial_set:
self.catch_logger(logger)
def catch_logger(self, logger: logging.Logger):
"""Add pytest handler and adjust level of logger. Remember state to restore."""
if logger not in self.loggers:
logger.addHandler(self.handler)
if self.level is not None:
self.loggers[logger] = logger.level
logger.setLevel(min(logger.level, self.level))
else:
self.loggers[logger] = None
def release_logger(self, logger: logging.Logger):
"""Restore log level and remove pytest handler."""
if logger in self.loggers:
logger.removeHandler(self.handler)
orig_level = self.loggers.get(logger)
if orig_level is not None:
logger.setLevel(orig_level)
del self.loggers[logger]
def release_all_loggers(self):
for logger in list(self.loggers.keys()):
self.release_logger(logger)
class LoggerPropagateInterceptor:
"""Descriptor to be patched into standard libs logging module.
It intercepts propagate = False assignments from user code.
While catching_logs is inactive, it tracks non-propagating loggers without side effects.
While catching_logs is active, it also adds and removes pytest handlers instantly.
"""
non_propagating_loggers: Set[logging.Logger] = set()
catchers: Set[CatchedLoggers] = set()
def __init__(self):
for item in logging.getLogger().manager.loggerDict:
if isinstance(item, logging.Logger) and not item.propagate:
self.non_propagating_loggers.add(item)
def __set__(self, logger: logging.Logger, propagate: bool):
if propagate is False:
if self.catchers:
for catcher in self.catchers:
catcher.catch_logger(logger)
self.non_propagating_loggers.add(logger)
elif propagate is True:
if self.catchers:
for catcher in self.catchers:
catcher.release_logger(logger)
if logger in self.non_propagating_loggers:
self.non_propagating_loggers.remove(logger)
logger.__dict__["propagate"] = propagate
# From now, we intercept each assignment to logger.propagate
logging.Logger.propagate = cast(bool, LoggerPropagateInterceptor())
class ColoredLevelFormatter(logging.Formatter): class ColoredLevelFormatter(logging.Formatter):
"""A logging formatter which colorizes the %(levelname)..s part of the """A logging formatter which colorizes the %(levelname)..s part of the
log format passed to __init__.""" log format passed to __init__."""
@ -306,34 +395,30 @@ def pytest_addoption(parser: Parser) -> None:
) )
_HandlerType = TypeVar("_HandlerType", bound=logging.Handler)
# Not using @contextmanager for performance reasons. # Not using @contextmanager for performance reasons.
class catching_logs: class catching_logs:
"""Context manager that prepares the whole logging machinery properly.""" """Context manager that prepares the whole logging machinery properly."""
__slots__ = ("handler", "level", "orig_level") __slots__ = ("handler", "level", "catcher")
def __init__(self, handler: _HandlerType, level: Optional[int] = None) -> None: def __init__(self, handler: _HandlerType, level: Optional[int] = None) -> None:
self.handler = handler self.handler = handler
self.level = level self.level = level
self.catcher: Optional[CatchedLoggers] = None
def __enter__(self): def __enter__(self):
root_logger = logging.getLogger() self.catcher = CatchedLoggers(
if self.level is not None: LoggerPropagateInterceptor.non_propagating_loggers | {logging.getLogger()},
self.handler.setLevel(self.level) self.handler,
root_logger.addHandler(self.handler) self.level,
if self.level is not None: )
self.orig_level = root_logger.level LoggerPropagateInterceptor.catchers.add(self.catcher)
root_logger.setLevel(min(self.orig_level, self.level))
return self.handler return self.handler
def __exit__(self, type, value, traceback): def __exit__(self, type, value, traceback):
root_logger = logging.getLogger() assert self.catcher
if self.level is not None: LoggerPropagateInterceptor.catchers.remove(self.catcher)
root_logger.setLevel(self.orig_level) self.catcher.release_all_loggers()
root_logger.removeHandler(self.handler)
class LogCaptureHandler(logging_StreamHandler): class LogCaptureHandler(logging_StreamHandler):

View File

@ -1074,6 +1074,35 @@ def test_log_set_path(pytester: Pytester) -> None:
assert "message from test 2" in content assert "message from test 2" in content
def test_log_propagation_false(pytester: Pytester) -> None:
pytester.makepyfile(
"""
import pytest
import logging
logging.getLogger('foo').propagate = False
def test_log_file(request):
plugin = request.config.pluginmanager.getplugin('logging-plugin')
logging.getLogger().warning("log goes to root logger")
logging.getLogger('foo').warning("log goes to initially non-propagating logger")
logging.getLogger('foo.bar').propagate = False
logging.getLogger('foo.bar').warning("log goes to propagation-disabled-in-test logger")
assert False, "intentionally fail to trigger report logging output"
"""
)
result = pytester.runpytest("-s")
result.stdout.re_match_lines(
[
"-+ Captured log call -+",
r"WARNING\s+root:test_log_propagation_false.py:8\s+log goes to root logger",
r"WARNING\s+foo:test_log_propagation_false.py:9\s+log goes to initially non-propagating logger",
r"WARNING\s+foo.bar:test_log_propagation_false.py:11\s+log goes to propagation-disabled-in-test logger",
]
)
def test_colored_captured_log(pytester: Pytester) -> None: def test_colored_captured_log(pytester: Pytester) -> None:
"""Test that the level names of captured log messages of a failing test """Test that the level names of captured log messages of a failing test
are colored.""" are colored."""