488 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			488 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
""" Access and control log capturing. """
 | 
						|
from __future__ import absolute_import, division, print_function
 | 
						|
 | 
						|
import logging
 | 
						|
from contextlib import closing, contextmanager
 | 
						|
import re
 | 
						|
import six
 | 
						|
 | 
						|
from _pytest.config import create_terminal_writer
 | 
						|
import pytest
 | 
						|
import py
 | 
						|
 | 
						|
 | 
						|
DEFAULT_LOG_FORMAT = '%(filename)-25s %(lineno)4d %(levelname)-8s %(message)s'
 | 
						|
DEFAULT_LOG_DATE_FORMAT = '%H:%M:%S'
 | 
						|
 | 
						|
 | 
						|
class ColoredLevelFormatter(logging.Formatter):
 | 
						|
    """
 | 
						|
    Colorize the %(levelname)..s part of the log format passed to __init__.
 | 
						|
    """
 | 
						|
 | 
						|
    LOGLEVEL_COLOROPTS = {
 | 
						|
        logging.CRITICAL: {'red'},
 | 
						|
        logging.ERROR: {'red', 'bold'},
 | 
						|
        logging.WARNING: {'yellow'},
 | 
						|
        logging.WARN: {'yellow'},
 | 
						|
        logging.INFO: {'green'},
 | 
						|
        logging.DEBUG: {'purple'},
 | 
						|
        logging.NOTSET: set(),
 | 
						|
    }
 | 
						|
    LEVELNAME_FMT_REGEX = re.compile(r'%\(levelname\)([+-]?\d*s)')
 | 
						|
 | 
						|
    def __init__(self, terminalwriter, *args, **kwargs):
 | 
						|
        super(ColoredLevelFormatter, self).__init__(
 | 
						|
            *args, **kwargs)
 | 
						|
        if six.PY2:
 | 
						|
            self._original_fmt = self._fmt
 | 
						|
        else:
 | 
						|
            self._original_fmt = self._style._fmt
 | 
						|
        self._level_to_fmt_mapping = {}
 | 
						|
 | 
						|
        levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt)
 | 
						|
        if not levelname_fmt_match:
 | 
						|
            return
 | 
						|
        levelname_fmt = levelname_fmt_match.group()
 | 
						|
 | 
						|
        for level, color_opts in self.LOGLEVEL_COLOROPTS.items():
 | 
						|
            formatted_levelname = levelname_fmt % {
 | 
						|
                'levelname': logging.getLevelName(level)}
 | 
						|
 | 
						|
            # add ANSI escape sequences around the formatted levelname
 | 
						|
            color_kwargs = {name: True for name in color_opts}
 | 
						|
            colorized_formatted_levelname = terminalwriter.markup(
 | 
						|
                formatted_levelname, **color_kwargs)
 | 
						|
            self._level_to_fmt_mapping[level] = self.LEVELNAME_FMT_REGEX.sub(
 | 
						|
                colorized_formatted_levelname,
 | 
						|
                self._fmt)
 | 
						|
 | 
						|
    def format(self, record):
 | 
						|
        fmt = self._level_to_fmt_mapping.get(
 | 
						|
            record.levelno, self._original_fmt)
 | 
						|
        if six.PY2:
 | 
						|
            self._fmt = fmt
 | 
						|
        else:
 | 
						|
            self._style._fmt = fmt
 | 
						|
        return super(ColoredLevelFormatter, self).format(record)
 | 
						|
 | 
						|
 | 
						|
def get_option_ini(config, *names):
 | 
						|
    for name in names:
 | 
						|
        ret = config.getoption(name)  # 'default' arg won't work as expected
 | 
						|
        if ret is None:
 | 
						|
            ret = config.getini(name)
 | 
						|
        if ret:
 | 
						|
            return ret
 | 
						|
 | 
						|
 | 
						|
def pytest_addoption(parser):
 | 
						|
    """Add options to control log capturing."""
 | 
						|
    group = parser.getgroup('logging')
 | 
						|
 | 
						|
    def add_option_ini(option, dest, default=None, type=None, **kwargs):
 | 
						|
        parser.addini(dest, default=default, type=type,
 | 
						|
                      help='default value for ' + option)
 | 
						|
        group.addoption(option, dest=dest, **kwargs)
 | 
						|
 | 
						|
    add_option_ini(
 | 
						|
        '--no-print-logs',
 | 
						|
        dest='log_print', action='store_const', const=False, default=True,
 | 
						|
        type='bool',
 | 
						|
        help='disable printing caught logs on failed tests.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-level',
 | 
						|
        dest='log_level', default=None,
 | 
						|
        help='logging level used by the logging module')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-format',
 | 
						|
        dest='log_format', default=DEFAULT_LOG_FORMAT,
 | 
						|
        help='log format as used by the logging module.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-date-format',
 | 
						|
        dest='log_date_format', default=DEFAULT_LOG_DATE_FORMAT,
 | 
						|
        help='log date format as used by the logging module.')
 | 
						|
    parser.addini(
 | 
						|
        'log_cli', default=False, type='bool',
 | 
						|
        help='enable log display during test run (also known as "live logging").')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-cli-level',
 | 
						|
        dest='log_cli_level', default=None,
 | 
						|
        help='cli logging level.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-cli-format',
 | 
						|
        dest='log_cli_format', default=None,
 | 
						|
        help='log format as used by the logging module.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-cli-date-format',
 | 
						|
        dest='log_cli_date_format', default=None,
 | 
						|
        help='log date format as used by the logging module.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-file',
 | 
						|
        dest='log_file', default=None,
 | 
						|
        help='path to a file when logging will be written to.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-file-level',
 | 
						|
        dest='log_file_level', default=None,
 | 
						|
        help='log file logging level.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-file-format',
 | 
						|
        dest='log_file_format', default=DEFAULT_LOG_FORMAT,
 | 
						|
        help='log format as used by the logging module.')
 | 
						|
    add_option_ini(
 | 
						|
        '--log-file-date-format',
 | 
						|
        dest='log_file_date_format', default=DEFAULT_LOG_DATE_FORMAT,
 | 
						|
        help='log date format as used by the logging module.')
 | 
						|
 | 
						|
 | 
						|
@contextmanager
 | 
						|
def catching_logs(handler, formatter=None, level=None):
 | 
						|
    """Context manager that prepares the whole logging machinery properly."""
 | 
						|
    root_logger = logging.getLogger()
 | 
						|
 | 
						|
    if formatter is not None:
 | 
						|
        handler.setFormatter(formatter)
 | 
						|
    if level is not None:
 | 
						|
        handler.setLevel(level)
 | 
						|
 | 
						|
    # Adding the same handler twice would confuse logging system.
 | 
						|
    # Just don't do that.
 | 
						|
    add_new_handler = handler not in root_logger.handlers
 | 
						|
 | 
						|
    if add_new_handler:
 | 
						|
        root_logger.addHandler(handler)
 | 
						|
    if level is not None:
 | 
						|
        orig_level = root_logger.level
 | 
						|
        root_logger.setLevel(level)
 | 
						|
    try:
 | 
						|
        yield handler
 | 
						|
    finally:
 | 
						|
        if level is not None:
 | 
						|
            root_logger.setLevel(orig_level)
 | 
						|
        if add_new_handler:
 | 
						|
            root_logger.removeHandler(handler)
 | 
						|
 | 
						|
 | 
						|
class LogCaptureHandler(logging.StreamHandler):
 | 
						|
    """A logging handler that stores log records and the log text."""
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        """Creates a new log handler."""
 | 
						|
        logging.StreamHandler.__init__(self, py.io.TextIO())
 | 
						|
        self.records = []
 | 
						|
 | 
						|
    def emit(self, record):
 | 
						|
        """Keep the log records in a list in addition to the log text."""
 | 
						|
        self.records.append(record)
 | 
						|
        logging.StreamHandler.emit(self, record)
 | 
						|
 | 
						|
 | 
						|
class LogCaptureFixture(object):
 | 
						|
    """Provides access and control of log capturing."""
 | 
						|
 | 
						|
    def __init__(self, item):
 | 
						|
        """Creates a new funcarg."""
 | 
						|
        self._item = item
 | 
						|
        self._initial_log_levels = {}  # type: Dict[str, int] # dict of log name -> log level
 | 
						|
 | 
						|
    def _finalize(self):
 | 
						|
        """Finalizes the fixture.
 | 
						|
 | 
						|
        This restores the log levels changed by :meth:`set_level`.
 | 
						|
        """
 | 
						|
        # restore log levels
 | 
						|
        for logger_name, level in self._initial_log_levels.items():
 | 
						|
            logger = logging.getLogger(logger_name)
 | 
						|
            logger.setLevel(level)
 | 
						|
 | 
						|
    @property
 | 
						|
    def handler(self):
 | 
						|
        return self._item.catch_log_handler
 | 
						|
 | 
						|
    def get_records(self, when):
 | 
						|
        """
 | 
						|
        Get the logging records for one of the possible test phases.
 | 
						|
 | 
						|
        :param str when:
 | 
						|
            Which test phase to obtain the records from. Valid values are: "setup", "call" and "teardown".
 | 
						|
 | 
						|
        :rtype: List[logging.LogRecord]
 | 
						|
        :return: the list of captured records at the given stage
 | 
						|
 | 
						|
        .. versionadded:: 3.4
 | 
						|
        """
 | 
						|
        handler = self._item.catch_log_handlers.get(when)
 | 
						|
        if handler:
 | 
						|
            return handler.records
 | 
						|
        else:
 | 
						|
            return []
 | 
						|
 | 
						|
    @property
 | 
						|
    def text(self):
 | 
						|
        """Returns the log text."""
 | 
						|
        return self.handler.stream.getvalue()
 | 
						|
 | 
						|
    @property
 | 
						|
    def records(self):
 | 
						|
        """Returns the list of log records."""
 | 
						|
        return self.handler.records
 | 
						|
 | 
						|
    @property
 | 
						|
    def record_tuples(self):
 | 
						|
        """Returns a list of a striped down version of log records intended
 | 
						|
        for use in assertion comparison.
 | 
						|
 | 
						|
        The format of the tuple is:
 | 
						|
 | 
						|
            (logger_name, log_level, message)
 | 
						|
        """
 | 
						|
        return [(r.name, r.levelno, r.getMessage()) for r in self.records]
 | 
						|
 | 
						|
    def clear(self):
 | 
						|
        """Reset the list of log records."""
 | 
						|
        self.handler.records = []
 | 
						|
 | 
						|
    def set_level(self, level, logger=None):
 | 
						|
        """Sets the level for capturing of logs. The level will be restored to its previous value at the end of
 | 
						|
        the test.
 | 
						|
 | 
						|
        :param int level: the logger to level.
 | 
						|
        :param str logger: the logger to update the level. If not given, the root logger level is updated.
 | 
						|
 | 
						|
        .. versionchanged:: 3.4
 | 
						|
            The levels of the loggers changed by this function will be restored to their initial values at the
 | 
						|
            end of the test.
 | 
						|
        """
 | 
						|
        logger_name = logger
 | 
						|
        logger = logging.getLogger(logger_name)
 | 
						|
        # save the original log-level to restore it during teardown
 | 
						|
        self._initial_log_levels.setdefault(logger_name, logger.level)
 | 
						|
        logger.setLevel(level)
 | 
						|
 | 
						|
    @contextmanager
 | 
						|
    def at_level(self, level, logger=None):
 | 
						|
        """Context manager that sets the level for capturing of logs. After the end of the 'with' statement the
 | 
						|
        level is restored to its original value.
 | 
						|
 | 
						|
        :param int level: the logger to level.
 | 
						|
        :param str logger: the logger to update the level. If not given, the root logger level is updated.
 | 
						|
        """
 | 
						|
        logger = logging.getLogger(logger)
 | 
						|
        orig_level = logger.level
 | 
						|
        logger.setLevel(level)
 | 
						|
        try:
 | 
						|
            yield
 | 
						|
        finally:
 | 
						|
            logger.setLevel(orig_level)
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def caplog(request):
 | 
						|
    """Access and control log capturing.
 | 
						|
 | 
						|
    Captured logs are available through the following methods::
 | 
						|
 | 
						|
    * caplog.text()          -> string containing formatted log output
 | 
						|
    * caplog.records()       -> list of logging.LogRecord instances
 | 
						|
    * caplog.record_tuples() -> list of (logger_name, level, message) tuples
 | 
						|
    """
 | 
						|
    result = LogCaptureFixture(request.node)
 | 
						|
    yield result
 | 
						|
    result._finalize()
 | 
						|
 | 
						|
 | 
						|
def get_actual_log_level(config, *setting_names):
 | 
						|
    """Return the actual logging level."""
 | 
						|
 | 
						|
    for setting_name in setting_names:
 | 
						|
        log_level = config.getoption(setting_name)
 | 
						|
        if log_level is None:
 | 
						|
            log_level = config.getini(setting_name)
 | 
						|
        if log_level:
 | 
						|
            break
 | 
						|
    else:
 | 
						|
        return
 | 
						|
 | 
						|
    if isinstance(log_level, six.string_types):
 | 
						|
        log_level = log_level.upper()
 | 
						|
    try:
 | 
						|
        return int(getattr(logging, log_level, log_level))
 | 
						|
    except ValueError:
 | 
						|
        # Python logging does not recognise this as a logging level
 | 
						|
        raise pytest.UsageError(
 | 
						|
            "'{0}' is not recognized as a logging level name for "
 | 
						|
            "'{1}'. Please consider passing the "
 | 
						|
            "logging level num instead.".format(
 | 
						|
                log_level,
 | 
						|
                setting_name))
 | 
						|
 | 
						|
 | 
						|
def pytest_configure(config):
 | 
						|
    config.pluginmanager.register(LoggingPlugin(config), 'logging-plugin')
 | 
						|
 | 
						|
 | 
						|
@contextmanager
 | 
						|
def _dummy_context_manager():
 | 
						|
    yield
 | 
						|
 | 
						|
 | 
						|
class LoggingPlugin(object):
 | 
						|
    """Attaches to the logging module and captures log messages for each test.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, config):
 | 
						|
        """Creates a new plugin to capture log messages.
 | 
						|
 | 
						|
        The formatter can be safely shared across all handlers so
 | 
						|
        create a single one for the entire test session here.
 | 
						|
        """
 | 
						|
        self._config = config
 | 
						|
 | 
						|
        # enable verbose output automatically if live logging is enabled
 | 
						|
        if self._config.getini('log_cli') and not config.getoption('verbose'):
 | 
						|
            # sanity check: terminal reporter should not have been loaded at this point
 | 
						|
            assert self._config.pluginmanager.get_plugin('terminalreporter') is None
 | 
						|
            config.option.verbose = 1
 | 
						|
 | 
						|
        self.print_logs = get_option_ini(config, 'log_print')
 | 
						|
        self.formatter = logging.Formatter(get_option_ini(config, 'log_format'),
 | 
						|
                                           get_option_ini(config, 'log_date_format'))
 | 
						|
        self.log_level = get_actual_log_level(config, 'log_level')
 | 
						|
 | 
						|
        log_file = get_option_ini(config, 'log_file')
 | 
						|
        if log_file:
 | 
						|
            self.log_file_level = get_actual_log_level(config, 'log_file_level')
 | 
						|
 | 
						|
            log_file_format = get_option_ini(config, 'log_file_format', 'log_format')
 | 
						|
            log_file_date_format = get_option_ini(config, 'log_file_date_format', 'log_date_format')
 | 
						|
            # Each pytest runtests session will write to a clean logfile
 | 
						|
            self.log_file_handler = logging.FileHandler(log_file, mode='w')
 | 
						|
            log_file_formatter = logging.Formatter(log_file_format, datefmt=log_file_date_format)
 | 
						|
            self.log_file_handler.setFormatter(log_file_formatter)
 | 
						|
        else:
 | 
						|
            self.log_file_handler = None
 | 
						|
 | 
						|
        # initialized during pytest_runtestloop
 | 
						|
        self.log_cli_handler = None
 | 
						|
 | 
						|
    @contextmanager
 | 
						|
    def _runtest_for(self, item, when):
 | 
						|
        """Implements the internals of pytest_runtest_xxx() hook."""
 | 
						|
        with catching_logs(LogCaptureHandler(),
 | 
						|
                           formatter=self.formatter, level=self.log_level) as log_handler:
 | 
						|
            if self.log_cli_handler:
 | 
						|
                self.log_cli_handler.set_when(when)
 | 
						|
            if not hasattr(item, 'catch_log_handlers'):
 | 
						|
                item.catch_log_handlers = {}
 | 
						|
            item.catch_log_handlers[when] = log_handler
 | 
						|
            item.catch_log_handler = log_handler
 | 
						|
            try:
 | 
						|
                yield  # run test
 | 
						|
            finally:
 | 
						|
                del item.catch_log_handler
 | 
						|
                if when == 'teardown':
 | 
						|
                    del item.catch_log_handlers
 | 
						|
 | 
						|
            if self.print_logs:
 | 
						|
                # Add a captured log section to the report.
 | 
						|
                log = log_handler.stream.getvalue().strip()
 | 
						|
                item.add_report_section(when, 'log', log)
 | 
						|
 | 
						|
    @pytest.hookimpl(hookwrapper=True)
 | 
						|
    def pytest_runtest_setup(self, item):
 | 
						|
        with self._runtest_for(item, 'setup'):
 | 
						|
            yield
 | 
						|
 | 
						|
    @pytest.hookimpl(hookwrapper=True)
 | 
						|
    def pytest_runtest_call(self, item):
 | 
						|
        with self._runtest_for(item, 'call'):
 | 
						|
            yield
 | 
						|
 | 
						|
    @pytest.hookimpl(hookwrapper=True)
 | 
						|
    def pytest_runtest_teardown(self, item):
 | 
						|
        with self._runtest_for(item, 'teardown'):
 | 
						|
            yield
 | 
						|
 | 
						|
    def pytest_runtest_logstart(self):
 | 
						|
        if self.log_cli_handler:
 | 
						|
            self.log_cli_handler.reset()
 | 
						|
 | 
						|
    @pytest.hookimpl(hookwrapper=True)
 | 
						|
    def pytest_runtestloop(self, session):
 | 
						|
        """Runs all collected test items."""
 | 
						|
        self._setup_cli_logging()
 | 
						|
        with self.live_logs_context:
 | 
						|
            if self.log_file_handler is not None:
 | 
						|
                with closing(self.log_file_handler):
 | 
						|
                    with catching_logs(self.log_file_handler,
 | 
						|
                                       level=self.log_file_level):
 | 
						|
                        yield  # run all the tests
 | 
						|
            else:
 | 
						|
                yield  # run all the tests
 | 
						|
 | 
						|
    def _setup_cli_logging(self):
 | 
						|
        """Sets up the handler and logger for the Live Logs feature, if enabled.
 | 
						|
 | 
						|
        This must be done right before starting the loop so we can access the terminal reporter plugin.
 | 
						|
        """
 | 
						|
        terminal_reporter = self._config.pluginmanager.get_plugin('terminalreporter')
 | 
						|
        if self._config.getini('log_cli') and terminal_reporter is not None:
 | 
						|
            capture_manager = self._config.pluginmanager.get_plugin('capturemanager')
 | 
						|
            log_cli_handler = _LiveLoggingStreamHandler(terminal_reporter, capture_manager)
 | 
						|
            log_cli_format = get_option_ini(self._config, 'log_cli_format', 'log_format')
 | 
						|
            log_cli_date_format = get_option_ini(self._config, 'log_cli_date_format', 'log_date_format')
 | 
						|
            if self._config.option.color != 'no' and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search(log_cli_format):
 | 
						|
                log_cli_formatter = ColoredLevelFormatter(create_terminal_writer(self._config),
 | 
						|
                                                          log_cli_format, datefmt=log_cli_date_format)
 | 
						|
            else:
 | 
						|
                log_cli_formatter = logging.Formatter(log_cli_format, datefmt=log_cli_date_format)
 | 
						|
            log_cli_level = get_actual_log_level(self._config, 'log_cli_level', 'log_level')
 | 
						|
            self.log_cli_handler = log_cli_handler
 | 
						|
            self.live_logs_context = catching_logs(log_cli_handler, formatter=log_cli_formatter, level=log_cli_level)
 | 
						|
        else:
 | 
						|
            self.live_logs_context = _dummy_context_manager()
 | 
						|
 | 
						|
 | 
						|
class _LiveLoggingStreamHandler(logging.StreamHandler):
 | 
						|
    """
 | 
						|
    Custom StreamHandler used by the live logging feature: it will write a newline before the first log message
 | 
						|
    in each test.
 | 
						|
 | 
						|
    During live logging we must also explicitly disable stdout/stderr capturing otherwise it will get captured
 | 
						|
    and won't appear in the terminal.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, terminal_reporter, capture_manager):
 | 
						|
        """
 | 
						|
        :param _pytest.terminal.TerminalReporter terminal_reporter:
 | 
						|
        :param _pytest.capture.CaptureManager capture_manager:
 | 
						|
        """
 | 
						|
        logging.StreamHandler.__init__(self, stream=terminal_reporter)
 | 
						|
        self.capture_manager = capture_manager
 | 
						|
        self.reset()
 | 
						|
        self.set_when(None)
 | 
						|
 | 
						|
    def reset(self):
 | 
						|
        """Reset the handler; should be called before the start of each test"""
 | 
						|
        self._first_record_emitted = False
 | 
						|
 | 
						|
    def set_when(self, when):
 | 
						|
        """Prepares for the given test phase (setup/call/teardown)"""
 | 
						|
        self._when = when
 | 
						|
        self._section_name_shown = False
 | 
						|
 | 
						|
    def emit(self, record):
 | 
						|
        if self.capture_manager is not None:
 | 
						|
            self.capture_manager.suspend_global_capture()
 | 
						|
        try:
 | 
						|
            if not self._first_record_emitted or self._when == 'teardown':
 | 
						|
                self.stream.write('\n')
 | 
						|
                self._first_record_emitted = True
 | 
						|
            if not self._section_name_shown and self._when:
 | 
						|
                self.stream.section('live log ' + self._when, sep='-', bold=True)
 | 
						|
                self._section_name_shown = True
 | 
						|
            logging.StreamHandler.emit(self, record)
 | 
						|
        finally:
 | 
						|
            if self.capture_manager is not None:
 | 
						|
                self.capture_manager.resume_global_capture()
 |