699 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			699 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
""" terminal reporting of the full testing process.
 | 
						|
 | 
						|
This is a good source for looking at the various reporting hooks.
 | 
						|
"""
 | 
						|
from __future__ import absolute_import, division, print_function
 | 
						|
 | 
						|
import itertools
 | 
						|
import platform
 | 
						|
import sys
 | 
						|
import time
 | 
						|
 | 
						|
import pluggy
 | 
						|
import py
 | 
						|
import six
 | 
						|
 | 
						|
import pytest
 | 
						|
from _pytest import nodes
 | 
						|
from _pytest.main import EXIT_OK, EXIT_TESTSFAILED, EXIT_INTERRUPTED, \
 | 
						|
    EXIT_USAGEERROR, EXIT_NOTESTSCOLLECTED
 | 
						|
 | 
						|
 | 
						|
def pytest_addoption(parser):
 | 
						|
    group = parser.getgroup("terminal reporting", "reporting", after="general")
 | 
						|
    group._addoption('-v', '--verbose', action="count",
 | 
						|
                     dest="verbose", default=0, help="increase verbosity."),
 | 
						|
    group._addoption('-q', '--quiet', action="count",
 | 
						|
                     dest="quiet", default=0, help="decrease verbosity."),
 | 
						|
    group._addoption('-r',
 | 
						|
                     action="store", dest="reportchars", default='', metavar="chars",
 | 
						|
                     help="show extra test summary info as specified by chars (f)ailed, "
 | 
						|
                     "(E)error, (s)skipped, (x)failed, (X)passed, "
 | 
						|
                     "(p)passed, (P)passed with output, (a)all except pP. "
 | 
						|
                     "Warnings are displayed at all times except when "
 | 
						|
                     "--disable-warnings is set")
 | 
						|
    group._addoption('--disable-warnings', '--disable-pytest-warnings', default=False,
 | 
						|
                     dest='disable_warnings', action='store_true',
 | 
						|
                     help='disable warnings summary')
 | 
						|
    group._addoption('-l', '--showlocals',
 | 
						|
                     action="store_true", dest="showlocals", default=False,
 | 
						|
                     help="show locals in tracebacks (disabled by default).")
 | 
						|
    group._addoption('--tb', metavar="style",
 | 
						|
                     action="store", dest="tbstyle", default='auto',
 | 
						|
                     choices=['auto', 'long', 'short', 'no', 'line', 'native'],
 | 
						|
                     help="traceback print mode (auto/long/short/line/native/no).")
 | 
						|
    group._addoption('--fulltrace', '--full-trace',
 | 
						|
                     action="store_true", default=False,
 | 
						|
                     help="don't cut any tracebacks (default is to cut).")
 | 
						|
    group._addoption('--color', metavar="color",
 | 
						|
                     action="store", dest="color", default='auto',
 | 
						|
                     choices=['yes', 'no', 'auto'],
 | 
						|
                     help="color terminal output (yes/no/auto).")
 | 
						|
 | 
						|
    parser.addini("console_output_style",
 | 
						|
                  help="console output: classic or with additional progress information (classic|progress).",
 | 
						|
                  default='progress')
 | 
						|
 | 
						|
 | 
						|
def pytest_configure(config):
 | 
						|
    config.option.verbose -= config.option.quiet
 | 
						|
    reporter = TerminalReporter(config, sys.stdout)
 | 
						|
    config.pluginmanager.register(reporter, 'terminalreporter')
 | 
						|
    if config.option.debug or config.option.traceconfig:
 | 
						|
        def mywriter(tags, args):
 | 
						|
            msg = " ".join(map(str, args))
 | 
						|
            reporter.write_line("[traceconfig] " + msg)
 | 
						|
        config.trace.root.setprocessor("pytest:config", mywriter)
 | 
						|
 | 
						|
 | 
						|
def getreportopt(config):
 | 
						|
    reportopts = ""
 | 
						|
    reportchars = config.option.reportchars
 | 
						|
    if not config.option.disable_warnings and 'w' not in reportchars:
 | 
						|
        reportchars += 'w'
 | 
						|
    elif config.option.disable_warnings and 'w' in reportchars:
 | 
						|
        reportchars = reportchars.replace('w', '')
 | 
						|
    if reportchars:
 | 
						|
        for char in reportchars:
 | 
						|
            if char not in reportopts and char != 'a':
 | 
						|
                reportopts += char
 | 
						|
            elif char == 'a':
 | 
						|
                reportopts = 'fEsxXw'
 | 
						|
    return reportopts
 | 
						|
 | 
						|
 | 
						|
def pytest_report_teststatus(report):
 | 
						|
    if report.passed:
 | 
						|
        letter = "."
 | 
						|
    elif report.skipped:
 | 
						|
        letter = "s"
 | 
						|
    elif report.failed:
 | 
						|
        letter = "F"
 | 
						|
        if report.when != "call":
 | 
						|
            letter = "f"
 | 
						|
    return report.outcome, letter, report.outcome.upper()
 | 
						|
 | 
						|
 | 
						|
class WarningReport:
 | 
						|
    """
 | 
						|
    Simple structure to hold warnings information captured by ``pytest_logwarning``.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, code, message, nodeid=None, fslocation=None):
 | 
						|
        """
 | 
						|
        :param code: unused
 | 
						|
        :param str message: user friendly message about the warning
 | 
						|
        :param str|None nodeid: node id that generated the warning (see ``get_location``).
 | 
						|
        :param tuple|py.path.local fslocation:
 | 
						|
            file system location of the source of the warning (see ``get_location``).
 | 
						|
        """
 | 
						|
        self.code = code
 | 
						|
        self.message = message
 | 
						|
        self.nodeid = nodeid
 | 
						|
        self.fslocation = fslocation
 | 
						|
 | 
						|
    def get_location(self, config):
 | 
						|
        """
 | 
						|
        Returns the more user-friendly information about the location
 | 
						|
        of a warning, or None.
 | 
						|
        """
 | 
						|
        if self.nodeid:
 | 
						|
            return self.nodeid
 | 
						|
        if self.fslocation:
 | 
						|
            if isinstance(self.fslocation, tuple) and len(self.fslocation) >= 2:
 | 
						|
                filename, linenum = self.fslocation[:2]
 | 
						|
                relpath = py.path.local(filename).relto(config.invocation_dir)
 | 
						|
                return '%s:%s' % (relpath, linenum)
 | 
						|
            else:
 | 
						|
                return str(self.fslocation)
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
class TerminalReporter:
 | 
						|
    def __init__(self, config, file=None):
 | 
						|
        import _pytest.config
 | 
						|
        self.config = config
 | 
						|
        self.verbosity = self.config.option.verbose
 | 
						|
        self.showheader = self.verbosity >= 0
 | 
						|
        self.showfspath = self.verbosity >= 0
 | 
						|
        self.showlongtestinfo = self.verbosity > 0
 | 
						|
        self._numcollected = 0
 | 
						|
        self._session = None
 | 
						|
 | 
						|
        self.stats = {}
 | 
						|
        self.startdir = py.path.local()
 | 
						|
        if file is None:
 | 
						|
            file = sys.stdout
 | 
						|
        self._tw = _pytest.config.create_terminal_writer(config, file)
 | 
						|
        self._screen_width = self._tw.fullwidth
 | 
						|
        self.currentfspath = None
 | 
						|
        self.reportchars = getreportopt(config)
 | 
						|
        self.hasmarkup = self._tw.hasmarkup
 | 
						|
        self.isatty = file.isatty()
 | 
						|
        self._progress_items_reported = 0
 | 
						|
        self._show_progress_info = self.config.getini('console_output_style') == 'progress'
 | 
						|
 | 
						|
    def hasopt(self, char):
 | 
						|
        char = {'xfailed': 'x', 'skipped': 's'}.get(char, char)
 | 
						|
        return char in self.reportchars
 | 
						|
 | 
						|
    def write_fspath_result(self, nodeid, res):
 | 
						|
        fspath = self.config.rootdir.join(nodeid.split("::")[0])
 | 
						|
        if fspath != self.currentfspath:
 | 
						|
            if self.currentfspath is not None:
 | 
						|
                self._write_progress_information_filling_space()
 | 
						|
            self.currentfspath = fspath
 | 
						|
            fspath = self.startdir.bestrelpath(fspath)
 | 
						|
            self._tw.line()
 | 
						|
            self._tw.write(fspath + " ")
 | 
						|
        self._tw.write(res)
 | 
						|
 | 
						|
    def write_ensure_prefix(self, prefix, extra="", **kwargs):
 | 
						|
        if self.currentfspath != prefix:
 | 
						|
            self._tw.line()
 | 
						|
            self.currentfspath = prefix
 | 
						|
            self._tw.write(prefix)
 | 
						|
        if extra:
 | 
						|
            self._tw.write(extra, **kwargs)
 | 
						|
            self.currentfspath = -2
 | 
						|
            self._write_progress_information_filling_space()
 | 
						|
 | 
						|
    def ensure_newline(self):
 | 
						|
        if self.currentfspath:
 | 
						|
            self._tw.line()
 | 
						|
            self.currentfspath = None
 | 
						|
 | 
						|
    def write(self, content, **markup):
 | 
						|
        self._tw.write(content, **markup)
 | 
						|
 | 
						|
    def write_line(self, line, **markup):
 | 
						|
        if not isinstance(line, six.text_type):
 | 
						|
            line = six.text_type(line, errors="replace")
 | 
						|
        self.ensure_newline()
 | 
						|
        self._tw.line(line, **markup)
 | 
						|
 | 
						|
    def rewrite(self, line, **markup):
 | 
						|
        """
 | 
						|
        Rewinds the terminal cursor to the beginning and writes the given line.
 | 
						|
 | 
						|
        :kwarg erase: if True, will also add spaces until the full terminal width to ensure
 | 
						|
            previous lines are properly erased.
 | 
						|
 | 
						|
        The rest of the keyword arguments are markup instructions.
 | 
						|
        """
 | 
						|
        erase = markup.pop('erase', False)
 | 
						|
        if erase:
 | 
						|
            fill_count = self._tw.fullwidth - len(line) - 1
 | 
						|
            fill = ' ' * fill_count
 | 
						|
        else:
 | 
						|
            fill = ''
 | 
						|
        line = str(line)
 | 
						|
        self._tw.write("\r" + line + fill, **markup)
 | 
						|
 | 
						|
    def write_sep(self, sep, title=None, **markup):
 | 
						|
        self.ensure_newline()
 | 
						|
        self._tw.sep(sep, title, **markup)
 | 
						|
 | 
						|
    def section(self, title, sep="=", **kw):
 | 
						|
        self._tw.sep(sep, title, **kw)
 | 
						|
 | 
						|
    def line(self, msg, **kw):
 | 
						|
        self._tw.line(msg, **kw)
 | 
						|
 | 
						|
    def pytest_internalerror(self, excrepr):
 | 
						|
        for line in six.text_type(excrepr).split("\n"):
 | 
						|
            self.write_line("INTERNALERROR> " + line)
 | 
						|
        return 1
 | 
						|
 | 
						|
    def pytest_logwarning(self, code, fslocation, message, nodeid):
 | 
						|
        warnings = self.stats.setdefault("warnings", [])
 | 
						|
        warning = WarningReport(code=code, fslocation=fslocation,
 | 
						|
                                message=message, nodeid=nodeid)
 | 
						|
        warnings.append(warning)
 | 
						|
 | 
						|
    def pytest_plugin_registered(self, plugin):
 | 
						|
        if self.config.option.traceconfig:
 | 
						|
            msg = "PLUGIN registered: %s" % (plugin,)
 | 
						|
            # XXX this event may happen during setup/teardown time
 | 
						|
            #     which unfortunately captures our output here
 | 
						|
            #     which garbles our output if we use self.write_line
 | 
						|
            self.write_line(msg)
 | 
						|
 | 
						|
    def pytest_deselected(self, items):
 | 
						|
        self.stats.setdefault('deselected', []).extend(items)
 | 
						|
 | 
						|
    def pytest_runtest_logstart(self, nodeid, location):
 | 
						|
        # ensure that the path is printed before the
 | 
						|
        # 1st test of a module starts running
 | 
						|
        if self.showlongtestinfo:
 | 
						|
            line = self._locationline(nodeid, *location)
 | 
						|
            self.write_ensure_prefix(line, "")
 | 
						|
        elif self.showfspath:
 | 
						|
            fsid = nodeid.split("::")[0]
 | 
						|
            self.write_fspath_result(fsid, "")
 | 
						|
 | 
						|
    def pytest_runtest_logreport(self, report):
 | 
						|
        rep = report
 | 
						|
        res = self.config.hook.pytest_report_teststatus(report=rep)
 | 
						|
        cat, letter, word = res
 | 
						|
        if isinstance(word, tuple):
 | 
						|
            word, markup = word
 | 
						|
        else:
 | 
						|
            markup = None
 | 
						|
        self.stats.setdefault(cat, []).append(rep)
 | 
						|
        self._tests_ran = True
 | 
						|
        if not letter and not word:
 | 
						|
            # probably passed setup/teardown
 | 
						|
            return
 | 
						|
        running_xdist = hasattr(rep, 'node')
 | 
						|
        self._progress_items_reported += 1
 | 
						|
        if self.verbosity <= 0:
 | 
						|
            if not running_xdist and self.showfspath:
 | 
						|
                self.write_fspath_result(rep.nodeid, letter)
 | 
						|
            else:
 | 
						|
                self._tw.write(letter)
 | 
						|
            self._write_progress_if_past_edge()
 | 
						|
        else:
 | 
						|
            if markup is None:
 | 
						|
                if rep.passed:
 | 
						|
                    markup = {'green': True}
 | 
						|
                elif rep.failed:
 | 
						|
                    markup = {'red': True}
 | 
						|
                elif rep.skipped:
 | 
						|
                    markup = {'yellow': True}
 | 
						|
                else:
 | 
						|
                    markup = {}
 | 
						|
            line = self._locationline(rep.nodeid, *rep.location)
 | 
						|
            if not running_xdist:
 | 
						|
                self.write_ensure_prefix(line, word, **markup)
 | 
						|
            else:
 | 
						|
                self.ensure_newline()
 | 
						|
                self._tw.write("[%s]" % rep.node.gateway.id)
 | 
						|
                if self._show_progress_info:
 | 
						|
                    self._tw.write(self._get_progress_information_message() + " ", cyan=True)
 | 
						|
                else:
 | 
						|
                    self._tw.write(' ')
 | 
						|
                self._tw.write(word, **markup)
 | 
						|
                self._tw.write(" " + line)
 | 
						|
                self.currentfspath = -2
 | 
						|
 | 
						|
    def _write_progress_if_past_edge(self):
 | 
						|
        if not self._show_progress_info:
 | 
						|
            return
 | 
						|
        last_item = self._progress_items_reported == self._session.testscollected
 | 
						|
        if last_item:
 | 
						|
            self._write_progress_information_filling_space()
 | 
						|
            return
 | 
						|
 | 
						|
        past_edge = self._tw.chars_on_current_line + self._PROGRESS_LENGTH + 1 >= self._screen_width
 | 
						|
        if past_edge:
 | 
						|
            msg = self._get_progress_information_message()
 | 
						|
            self._tw.write(msg + '\n', cyan=True)
 | 
						|
 | 
						|
    _PROGRESS_LENGTH = len(' [100%]')
 | 
						|
 | 
						|
    def _get_progress_information_message(self):
 | 
						|
        progress = self._progress_items_reported * 100 // self._session.testscollected
 | 
						|
        return ' [{:3d}%]'.format(progress)
 | 
						|
 | 
						|
    def _write_progress_information_filling_space(self):
 | 
						|
        if not self._show_progress_info:
 | 
						|
            return
 | 
						|
        msg = self._get_progress_information_message()
 | 
						|
        fill = ' ' * (self._tw.fullwidth - self._tw.chars_on_current_line - len(msg) - 1)
 | 
						|
        self.write(fill + msg, cyan=True)
 | 
						|
 | 
						|
    def pytest_collection(self):
 | 
						|
        if not self.isatty and self.config.option.verbose >= 1:
 | 
						|
            self.write("collecting ... ", bold=True)
 | 
						|
 | 
						|
    def pytest_collectreport(self, report):
 | 
						|
        if report.failed:
 | 
						|
            self.stats.setdefault("error", []).append(report)
 | 
						|
        elif report.skipped:
 | 
						|
            self.stats.setdefault("skipped", []).append(report)
 | 
						|
        items = [x for x in report.result if isinstance(x, pytest.Item)]
 | 
						|
        self._numcollected += len(items)
 | 
						|
        if self.isatty:
 | 
						|
            # self.write_fspath_result(report.nodeid, 'E')
 | 
						|
            self.report_collect()
 | 
						|
 | 
						|
    def report_collect(self, final=False):
 | 
						|
        if self.config.option.verbose < 0:
 | 
						|
            return
 | 
						|
 | 
						|
        errors = len(self.stats.get('error', []))
 | 
						|
        skipped = len(self.stats.get('skipped', []))
 | 
						|
        if final:
 | 
						|
            line = "collected "
 | 
						|
        else:
 | 
						|
            line = "collecting "
 | 
						|
        line += str(self._numcollected) + " item" + ('' if self._numcollected == 1 else 's')
 | 
						|
        if errors:
 | 
						|
            line += " / %d errors" % errors
 | 
						|
        if skipped:
 | 
						|
            line += " / %d skipped" % skipped
 | 
						|
        if self.isatty:
 | 
						|
            self.rewrite(line, bold=True, erase=True)
 | 
						|
            if final:
 | 
						|
                self.write('\n')
 | 
						|
        else:
 | 
						|
            self.write_line(line)
 | 
						|
 | 
						|
    def pytest_collection_modifyitems(self):
 | 
						|
        self.report_collect(True)
 | 
						|
 | 
						|
    @pytest.hookimpl(trylast=True)
 | 
						|
    def pytest_sessionstart(self, session):
 | 
						|
        self._session = session
 | 
						|
        self._sessionstarttime = time.time()
 | 
						|
        if not self.showheader:
 | 
						|
            return
 | 
						|
        self.write_sep("=", "test session starts", bold=True)
 | 
						|
        verinfo = platform.python_version()
 | 
						|
        msg = "platform %s -- Python %s" % (sys.platform, verinfo)
 | 
						|
        if hasattr(sys, 'pypy_version_info'):
 | 
						|
            verinfo = ".".join(map(str, sys.pypy_version_info[:3]))
 | 
						|
            msg += "[pypy-%s-%s]" % (verinfo, sys.pypy_version_info[3])
 | 
						|
        msg += ", pytest-%s, py-%s, pluggy-%s" % (
 | 
						|
               pytest.__version__, py.__version__, pluggy.__version__)
 | 
						|
        if self.verbosity > 0 or self.config.option.debug or \
 | 
						|
           getattr(self.config.option, 'pastebin', None):
 | 
						|
            msg += " -- " + str(sys.executable)
 | 
						|
        self.write_line(msg)
 | 
						|
        lines = self.config.hook.pytest_report_header(
 | 
						|
            config=self.config, startdir=self.startdir)
 | 
						|
        self._write_report_lines_from_hooks(lines)
 | 
						|
 | 
						|
    def _write_report_lines_from_hooks(self, lines):
 | 
						|
        lines.reverse()
 | 
						|
        for line in flatten(lines):
 | 
						|
            self.write_line(line)
 | 
						|
 | 
						|
    def pytest_report_header(self, config):
 | 
						|
        inifile = ""
 | 
						|
        if config.inifile:
 | 
						|
            inifile = " " + config.rootdir.bestrelpath(config.inifile)
 | 
						|
        lines = ["rootdir: %s, inifile:%s" % (config.rootdir, inifile)]
 | 
						|
 | 
						|
        plugininfo = config.pluginmanager.list_plugin_distinfo()
 | 
						|
        if plugininfo:
 | 
						|
 | 
						|
            lines.append(
 | 
						|
                "plugins: %s" % ", ".join(_plugin_nameversions(plugininfo)))
 | 
						|
        return lines
 | 
						|
 | 
						|
    def pytest_collection_finish(self, session):
 | 
						|
        if self.config.option.collectonly:
 | 
						|
            self._printcollecteditems(session.items)
 | 
						|
            if self.stats.get('failed'):
 | 
						|
                self._tw.sep("!", "collection failures")
 | 
						|
                for rep in self.stats.get('failed'):
 | 
						|
                    rep.toterminal(self._tw)
 | 
						|
                return 1
 | 
						|
            return 0
 | 
						|
        lines = self.config.hook.pytest_report_collectionfinish(
 | 
						|
            config=self.config, startdir=self.startdir, items=session.items)
 | 
						|
        self._write_report_lines_from_hooks(lines)
 | 
						|
 | 
						|
    def _printcollecteditems(self, items):
 | 
						|
        # to print out items and their parent collectors
 | 
						|
        # we take care to leave out Instances aka ()
 | 
						|
        # because later versions are going to get rid of them anyway
 | 
						|
        if self.config.option.verbose < 0:
 | 
						|
            if self.config.option.verbose < -1:
 | 
						|
                counts = {}
 | 
						|
                for item in items:
 | 
						|
                    name = item.nodeid.split('::', 1)[0]
 | 
						|
                    counts[name] = counts.get(name, 0) + 1
 | 
						|
                for name, count in sorted(counts.items()):
 | 
						|
                    self._tw.line("%s: %d" % (name, count))
 | 
						|
            else:
 | 
						|
                for item in items:
 | 
						|
                    nodeid = item.nodeid
 | 
						|
                    nodeid = nodeid.replace("::()::", "::")
 | 
						|
                    self._tw.line(nodeid)
 | 
						|
            return
 | 
						|
        stack = []
 | 
						|
        indent = ""
 | 
						|
        for item in items:
 | 
						|
            needed_collectors = item.listchain()[1:]  # strip root node
 | 
						|
            while stack:
 | 
						|
                if stack == needed_collectors[:len(stack)]:
 | 
						|
                    break
 | 
						|
                stack.pop()
 | 
						|
            for col in needed_collectors[len(stack):]:
 | 
						|
                stack.append(col)
 | 
						|
                # if col.name == "()":
 | 
						|
                #    continue
 | 
						|
                indent = (len(stack) - 1) * "  "
 | 
						|
                self._tw.line("%s%s" % (indent, col))
 | 
						|
 | 
						|
    @pytest.hookimpl(hookwrapper=True)
 | 
						|
    def pytest_sessionfinish(self, exitstatus):
 | 
						|
        outcome = yield
 | 
						|
        outcome.get_result()
 | 
						|
        self._tw.line("")
 | 
						|
        summary_exit_codes = (
 | 
						|
            EXIT_OK, EXIT_TESTSFAILED, EXIT_INTERRUPTED, EXIT_USAGEERROR,
 | 
						|
            EXIT_NOTESTSCOLLECTED)
 | 
						|
        if exitstatus in summary_exit_codes:
 | 
						|
            self.config.hook.pytest_terminal_summary(terminalreporter=self,
 | 
						|
                                                     exitstatus=exitstatus)
 | 
						|
            self.summary_errors()
 | 
						|
            self.summary_failures()
 | 
						|
            self.summary_warnings()
 | 
						|
            self.summary_passes()
 | 
						|
        if exitstatus == EXIT_INTERRUPTED:
 | 
						|
            self._report_keyboardinterrupt()
 | 
						|
            del self._keyboardinterrupt_memo
 | 
						|
        self.summary_deselected()
 | 
						|
        self.summary_stats()
 | 
						|
 | 
						|
    def pytest_keyboard_interrupt(self, excinfo):
 | 
						|
        self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)
 | 
						|
 | 
						|
    def pytest_unconfigure(self):
 | 
						|
        if hasattr(self, '_keyboardinterrupt_memo'):
 | 
						|
            self._report_keyboardinterrupt()
 | 
						|
 | 
						|
    def _report_keyboardinterrupt(self):
 | 
						|
        excrepr = self._keyboardinterrupt_memo
 | 
						|
        msg = excrepr.reprcrash.message
 | 
						|
        self.write_sep("!", msg)
 | 
						|
        if "KeyboardInterrupt" in msg:
 | 
						|
            if self.config.option.fulltrace:
 | 
						|
                excrepr.toterminal(self._tw)
 | 
						|
            else:
 | 
						|
                self._tw.line("to show a full traceback on KeyboardInterrupt use --fulltrace", yellow=True)
 | 
						|
                excrepr.reprcrash.toterminal(self._tw)
 | 
						|
 | 
						|
    def _locationline(self, nodeid, fspath, lineno, domain):
 | 
						|
        def mkrel(nodeid):
 | 
						|
            line = self.config.cwd_relative_nodeid(nodeid)
 | 
						|
            if domain and line.endswith(domain):
 | 
						|
                line = line[:-len(domain)]
 | 
						|
                values = domain.split("[")
 | 
						|
                values[0] = values[0].replace('.', '::')  # don't replace '.' in params
 | 
						|
                line += "[".join(values)
 | 
						|
            return line
 | 
						|
        # collect_fspath comes from testid which has a "/"-normalized path
 | 
						|
 | 
						|
        if fspath:
 | 
						|
            res = mkrel(nodeid).replace("::()", "")  # parens-normalization
 | 
						|
            if nodeid.split("::")[0] != fspath.replace("\\", nodes.SEP):
 | 
						|
                res += " <- " + self.startdir.bestrelpath(fspath)
 | 
						|
        else:
 | 
						|
            res = "[location]"
 | 
						|
        return res + " "
 | 
						|
 | 
						|
    def _getfailureheadline(self, rep):
 | 
						|
        if hasattr(rep, 'location'):
 | 
						|
            fspath, lineno, domain = rep.location
 | 
						|
            return domain
 | 
						|
        else:
 | 
						|
            return "test session"  # XXX?
 | 
						|
 | 
						|
    def _getcrashline(self, rep):
 | 
						|
        try:
 | 
						|
            return str(rep.longrepr.reprcrash)
 | 
						|
        except AttributeError:
 | 
						|
            try:
 | 
						|
                return str(rep.longrepr)[:50]
 | 
						|
            except AttributeError:
 | 
						|
                return ""
 | 
						|
 | 
						|
    #
 | 
						|
    # summaries for sessionfinish
 | 
						|
    #
 | 
						|
    def getreports(self, name):
 | 
						|
        values = []
 | 
						|
        for x in self.stats.get(name, []):
 | 
						|
            if not hasattr(x, '_pdbshown'):
 | 
						|
                values.append(x)
 | 
						|
        return values
 | 
						|
 | 
						|
    def summary_warnings(self):
 | 
						|
        if self.hasopt("w"):
 | 
						|
            all_warnings = self.stats.get("warnings")
 | 
						|
            if not all_warnings:
 | 
						|
                return
 | 
						|
 | 
						|
            grouped = itertools.groupby(all_warnings, key=lambda wr: wr.get_location(self.config))
 | 
						|
 | 
						|
            self.write_sep("=", "warnings summary", yellow=True, bold=False)
 | 
						|
            for location, warning_records in grouped:
 | 
						|
                self._tw.line(str(location) or '<undetermined location>')
 | 
						|
                for w in warning_records:
 | 
						|
                    lines = w.message.splitlines()
 | 
						|
                    indented = '\n'.join('  ' + x for x in lines)
 | 
						|
                    self._tw.line(indented)
 | 
						|
                self._tw.line()
 | 
						|
            self._tw.line('-- Docs: http://doc.pytest.org/en/latest/warnings.html')
 | 
						|
 | 
						|
    def summary_passes(self):
 | 
						|
        if self.config.option.tbstyle != "no":
 | 
						|
            if self.hasopt("P"):
 | 
						|
                reports = self.getreports('passed')
 | 
						|
                if not reports:
 | 
						|
                    return
 | 
						|
                self.write_sep("=", "PASSES")
 | 
						|
                for rep in reports:
 | 
						|
                    msg = self._getfailureheadline(rep)
 | 
						|
                    self.write_sep("_", msg)
 | 
						|
                    self._outrep_summary(rep)
 | 
						|
 | 
						|
    def print_teardown_sections(self, rep):
 | 
						|
        for secname, content in rep.sections:
 | 
						|
            if 'teardown' in secname:
 | 
						|
                self._tw.sep('-', secname)
 | 
						|
                if content[-1:] == "\n":
 | 
						|
                    content = content[:-1]
 | 
						|
                self._tw.line(content)
 | 
						|
 | 
						|
    def summary_failures(self):
 | 
						|
        if self.config.option.tbstyle != "no":
 | 
						|
            reports = self.getreports('failed')
 | 
						|
            if not reports:
 | 
						|
                return
 | 
						|
            self.write_sep("=", "FAILURES")
 | 
						|
            for rep in reports:
 | 
						|
                if self.config.option.tbstyle == "line":
 | 
						|
                    line = self._getcrashline(rep)
 | 
						|
                    self.write_line(line)
 | 
						|
                else:
 | 
						|
                    msg = self._getfailureheadline(rep)
 | 
						|
                    markup = {'red': True, 'bold': True}
 | 
						|
                    self.write_sep("_", msg, **markup)
 | 
						|
                    self._outrep_summary(rep)
 | 
						|
                    for report in self.getreports(''):
 | 
						|
                        if report.nodeid == rep.nodeid and report.when == 'teardown':
 | 
						|
                            self.print_teardown_sections(report)
 | 
						|
 | 
						|
    def summary_errors(self):
 | 
						|
        if self.config.option.tbstyle != "no":
 | 
						|
            reports = self.getreports('error')
 | 
						|
            if not reports:
 | 
						|
                return
 | 
						|
            self.write_sep("=", "ERRORS")
 | 
						|
            for rep in self.stats['error']:
 | 
						|
                msg = self._getfailureheadline(rep)
 | 
						|
                if not hasattr(rep, 'when'):
 | 
						|
                    # collect
 | 
						|
                    msg = "ERROR collecting " + msg
 | 
						|
                elif rep.when == "setup":
 | 
						|
                    msg = "ERROR at setup of " + msg
 | 
						|
                elif rep.when == "teardown":
 | 
						|
                    msg = "ERROR at teardown of " + msg
 | 
						|
                self.write_sep("_", msg)
 | 
						|
                self._outrep_summary(rep)
 | 
						|
 | 
						|
    def _outrep_summary(self, rep):
 | 
						|
        rep.toterminal(self._tw)
 | 
						|
        for secname, content in rep.sections:
 | 
						|
            self._tw.sep("-", secname)
 | 
						|
            if content[-1:] == "\n":
 | 
						|
                content = content[:-1]
 | 
						|
            self._tw.line(content)
 | 
						|
 | 
						|
    def summary_stats(self):
 | 
						|
        session_duration = time.time() - self._sessionstarttime
 | 
						|
        (line, color) = build_summary_stats_line(self.stats)
 | 
						|
        msg = "%s in %.2f seconds" % (line, session_duration)
 | 
						|
        markup = {color: True, 'bold': True}
 | 
						|
 | 
						|
        if self.verbosity >= 0:
 | 
						|
            self.write_sep("=", msg, **markup)
 | 
						|
        if self.verbosity == -1:
 | 
						|
            self.write_line(msg, **markup)
 | 
						|
 | 
						|
    def summary_deselected(self):
 | 
						|
        if 'deselected' in self.stats:
 | 
						|
            self.write_sep("=", "%d tests deselected" % (
 | 
						|
                len(self.stats['deselected'])), bold=True)
 | 
						|
 | 
						|
 | 
						|
def repr_pythonversion(v=None):
 | 
						|
    if v is None:
 | 
						|
        v = sys.version_info
 | 
						|
    try:
 | 
						|
        return "%s.%s.%s-%s-%s" % v
 | 
						|
    except (TypeError, ValueError):
 | 
						|
        return str(v)
 | 
						|
 | 
						|
 | 
						|
def flatten(values):
 | 
						|
    for x in values:
 | 
						|
        if isinstance(x, (list, tuple)):
 | 
						|
            for y in flatten(x):
 | 
						|
                yield y
 | 
						|
        else:
 | 
						|
            yield x
 | 
						|
 | 
						|
 | 
						|
def build_summary_stats_line(stats):
 | 
						|
    keys = ("failed passed skipped deselected "
 | 
						|
            "xfailed xpassed warnings error").split()
 | 
						|
    unknown_key_seen = False
 | 
						|
    for key in stats.keys():
 | 
						|
        if key not in keys:
 | 
						|
            if key:  # setup/teardown reports have an empty key, ignore them
 | 
						|
                keys.append(key)
 | 
						|
                unknown_key_seen = True
 | 
						|
    parts = []
 | 
						|
    for key in keys:
 | 
						|
        val = stats.get(key, None)
 | 
						|
        if val:
 | 
						|
            parts.append("%d %s" % (len(val), key))
 | 
						|
 | 
						|
    if parts:
 | 
						|
        line = ", ".join(parts)
 | 
						|
    else:
 | 
						|
        line = "no tests ran"
 | 
						|
 | 
						|
    if 'failed' in stats or 'error' in stats:
 | 
						|
        color = 'red'
 | 
						|
    elif 'warnings' in stats or unknown_key_seen:
 | 
						|
        color = 'yellow'
 | 
						|
    elif 'passed' in stats:
 | 
						|
        color = 'green'
 | 
						|
    else:
 | 
						|
        color = 'yellow'
 | 
						|
 | 
						|
    return (line, color)
 | 
						|
 | 
						|
 | 
						|
def _plugin_nameversions(plugininfo):
 | 
						|
    values = []
 | 
						|
    for plugin, dist in plugininfo:
 | 
						|
        # gets us name and version!
 | 
						|
        name = '{dist.project_name}-{dist.version}'.format(dist=dist)
 | 
						|
        # questionable convenience, but it keeps things short
 | 
						|
        if name.startswith("pytest-"):
 | 
						|
            name = name[7:]
 | 
						|
        # we decided to print python package names
 | 
						|
        # they can have more than one plugin
 | 
						|
        if name not in values:
 | 
						|
            values.append(name)
 | 
						|
    return values
 |