402 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			402 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
"""
 | 
						|
terminal reporting of the full testing process.
 | 
						|
"""
 | 
						|
import py
 | 
						|
import sys
 | 
						|
 | 
						|
def pytest_configure(config):
 | 
						|
    if config.option.collectonly:
 | 
						|
        reporter = CollectonlyReporter(config)
 | 
						|
    else:
 | 
						|
        reporter = TerminalReporter(config)
 | 
						|
    # XXX see remote.py's XXX 
 | 
						|
    for attr in 'pytest_terminal_hasmarkup', 'pytest_terminal_fullwidth':
 | 
						|
        if hasattr(config, attr):
 | 
						|
            #print "SETTING TERMINAL OPTIONS", attr, getattr(config, attr)
 | 
						|
            name = attr.split("_")[-1]
 | 
						|
            assert hasattr(self.reporter._tw, name), name
 | 
						|
            setattr(reporter._tw, name, getattr(config, attr))
 | 
						|
    config.pluginmanager.register(reporter)
 | 
						|
 | 
						|
class TerminalReporter:
 | 
						|
    def __init__(self, config, file=None):
 | 
						|
        self.config = config 
 | 
						|
        self.stats = {}       
 | 
						|
        self.curdir = py.path.local()
 | 
						|
        if file is None:
 | 
						|
            file = py.std.sys.stdout
 | 
						|
        self._tw = py.io.TerminalWriter(file)
 | 
						|
        self.currentfspath = None 
 | 
						|
        self.gateway2info = {}
 | 
						|
 | 
						|
    def write_fspath_result(self, fspath, res):
 | 
						|
        fspath = self.curdir.bestrelpath(fspath)
 | 
						|
        if fspath != self.currentfspath:
 | 
						|
            self._tw.line()
 | 
						|
            relpath = self.curdir.bestrelpath(fspath)
 | 
						|
            self._tw.write(relpath + " ")
 | 
						|
            self.currentfspath = fspath
 | 
						|
        self._tw.write(res)
 | 
						|
 | 
						|
    def write_ensure_prefix(self, prefix, extra="", **kwargs):
 | 
						|
        if self.currentfspath != prefix:
 | 
						|
            self._tw.line()
 | 
						|
            self.currentfspath = prefix 
 | 
						|
            self._tw.write(prefix)
 | 
						|
        if extra:
 | 
						|
            self._tw.write(extra, **kwargs)
 | 
						|
            self.currentfspath = -2
 | 
						|
 | 
						|
    def ensure_newline(self):
 | 
						|
        if self.currentfspath: 
 | 
						|
            self._tw.line()
 | 
						|
            self.currentfspath = None
 | 
						|
 | 
						|
    def write_line(self, line, **markup):
 | 
						|
        line = str(line)
 | 
						|
        self.ensure_newline()
 | 
						|
        self._tw.line(line, **markup)
 | 
						|
 | 
						|
    def write_sep(self, sep, title=None, **markup):
 | 
						|
        self.ensure_newline()
 | 
						|
        self._tw.sep(sep, title, **markup)
 | 
						|
 | 
						|
    def getcategoryletterword(self, rep):
 | 
						|
        res = self.config.hook.pytest_report_teststatus(rep=rep)
 | 
						|
        if res:
 | 
						|
            return res
 | 
						|
        for cat in 'skipped failed passed ???'.split():
 | 
						|
            if getattr(rep, cat, None):
 | 
						|
                break 
 | 
						|
        return cat, self.getoutcomeletter(rep), self.getoutcomeword(rep)
 | 
						|
 | 
						|
    def getoutcomeletter(self, rep):
 | 
						|
        return rep.shortrepr 
 | 
						|
 | 
						|
    def getoutcomeword(self, rep):
 | 
						|
        if rep.passed: 
 | 
						|
            return "PASS", dict(green=True)
 | 
						|
        elif rep.failed: 
 | 
						|
            return "FAIL", dict(red=True)
 | 
						|
        elif rep.skipped: 
 | 
						|
            return "SKIP"
 | 
						|
        else: 
 | 
						|
            return "???", dict(red=True)
 | 
						|
 | 
						|
    def pytest_internalerror(self, excrepr):
 | 
						|
        for line in str(excrepr).split("\n"):
 | 
						|
            self.write_line("INTERNALERROR> " + line)
 | 
						|
 | 
						|
    def pyexecnet_gwmanage_newgateway(self, gateway, platinfo):
 | 
						|
        #self.write_line("%s instantiated gateway from spec %r" %(gateway.id, gateway.spec._spec))
 | 
						|
        d = {}
 | 
						|
        d['version'] = repr_pythonversion(platinfo.version_info)
 | 
						|
        d['id'] = gateway.id
 | 
						|
        d['spec'] = gateway.spec._spec 
 | 
						|
        d['platform'] = platinfo.platform 
 | 
						|
        if self.config.option.verbose:
 | 
						|
            d['extra'] = "- " + platinfo.executable
 | 
						|
        else:
 | 
						|
            d['extra'] = ""
 | 
						|
        d['cwd'] = platinfo.cwd
 | 
						|
        infoline = ("%(id)s %(spec)s -- platform %(platform)s, "
 | 
						|
                        "Python %(version)s "
 | 
						|
                        "cwd: %(cwd)s"
 | 
						|
                        "%(extra)s" % d)
 | 
						|
        self.write_line(infoline)
 | 
						|
        self.gateway2info[gateway] = infoline
 | 
						|
 | 
						|
    def pyexecnet_gwmanage_rsyncstart(self, source, gateways):
 | 
						|
        targets = ", ".join([gw.id for gw in gateways])
 | 
						|
        msg = "rsyncstart: %s -> %s" %(source, targets)
 | 
						|
        if not self.config.option.verbose:
 | 
						|
            msg += " # use --verbose to see rsync progress"
 | 
						|
        self.write_line(msg)
 | 
						|
 | 
						|
    def pyexecnet_gwmanage_rsyncfinish(self, source, gateways):
 | 
						|
        targets = ", ".join([gw.id for gw in gateways])
 | 
						|
        self.write_line("rsyncfinish: %s -> %s" %(source, targets))
 | 
						|
 | 
						|
    def pytest_plugin_registered(self, plugin):
 | 
						|
        if self.config.option.traceconfig: 
 | 
						|
            msg = "PLUGIN registered: %s" %(plugin,)
 | 
						|
            # XXX this event may happen during setup/teardown time 
 | 
						|
            #     which unfortunately captures our output here 
 | 
						|
            #     which garbles our output if we use self.write_line 
 | 
						|
            self.write_line(msg)
 | 
						|
 | 
						|
    def pytest_testnodeready(self, node):
 | 
						|
        self.write_line("%s txnode ready to receive tests" %(node.gateway.id,))
 | 
						|
 | 
						|
    def pytest_testnodedown(self, node, error):
 | 
						|
        if error:
 | 
						|
            self.write_line("%s node down, error: %s" %(node.gateway.id, error))
 | 
						|
 | 
						|
    def pytest_trace(self, category, msg):
 | 
						|
        if self.config.option.debug or \
 | 
						|
           self.config.option.traceconfig and category.find("config") != -1:
 | 
						|
            self.write_line("[%s] %s" %(category, msg))
 | 
						|
 | 
						|
    def pytest_rescheduleitems(self, items):
 | 
						|
        if self.config.option.debug:
 | 
						|
            self.write_sep("!", "RESCHEDULING %s " %(items,))
 | 
						|
 | 
						|
    def pytest_deselected(self, items):
 | 
						|
        self.stats.setdefault('deselected', []).append(items)
 | 
						|
 | 
						|
    def pytest_itemstart(self, item, node=None):
 | 
						|
        if self.config.option.dist != "no":
 | 
						|
            # for dist-testing situations itemstart means we 
 | 
						|
            # queued the item for sending, not interesting (unless debugging) 
 | 
						|
            if self.config.option.debug:
 | 
						|
                line = self._reportinfoline(item)
 | 
						|
                extra = ""
 | 
						|
                if node:
 | 
						|
                    extra = "-> " + str(node.gateway.id)
 | 
						|
                self.write_ensure_prefix(line, extra)
 | 
						|
        else:
 | 
						|
            if self.config.option.verbose:
 | 
						|
                line = self._reportinfoline(item)
 | 
						|
                self.write_ensure_prefix(line, "") 
 | 
						|
            else:
 | 
						|
                # ensure that the path is printed before the 
 | 
						|
                # 1st test of a module starts running
 | 
						|
                fspath, lineno, msg = self._getreportinfo(item)
 | 
						|
                self.write_fspath_result(fspath, "")
 | 
						|
 | 
						|
    def pytest_runtest_logreport(self, rep):
 | 
						|
        if rep.passed and rep.when in ("setup", "teardown"):
 | 
						|
            return 
 | 
						|
        fspath = rep.item.fspath 
 | 
						|
        cat, letter, word = self.getcategoryletterword(rep)
 | 
						|
        if isinstance(word, tuple):
 | 
						|
            word, markup = word
 | 
						|
        else:
 | 
						|
            markup = {}
 | 
						|
        self.stats.setdefault(cat, []).append(rep)
 | 
						|
        if not self.config.option.verbose:
 | 
						|
            fspath, lineno, msg = self._getreportinfo(rep.item)
 | 
						|
            self.write_fspath_result(fspath, letter)
 | 
						|
        else:
 | 
						|
            line = self._reportinfoline(rep.item)
 | 
						|
            if not hasattr(rep, 'node'):
 | 
						|
                self.write_ensure_prefix(line, word, **markup)
 | 
						|
            else:
 | 
						|
                self.ensure_newline()
 | 
						|
                if hasattr(rep, 'node'):
 | 
						|
                    self._tw.write("%s " % rep.node.gateway.id)
 | 
						|
                self._tw.write(word, **markup)
 | 
						|
                self._tw.write(" " + line)
 | 
						|
                self.currentfspath = -2
 | 
						|
 | 
						|
    def pytest_collectreport(self, rep):
 | 
						|
        if not rep.passed:
 | 
						|
            if rep.failed:
 | 
						|
                self.stats.setdefault("failed", []).append(rep)
 | 
						|
                msg = rep.longrepr.reprcrash.message 
 | 
						|
                self.write_fspath_result(rep.collector.fspath, "F")
 | 
						|
            elif rep.skipped:
 | 
						|
                self.stats.setdefault("skipped", []).append(rep)
 | 
						|
                self.write_fspath_result(rep.collector.fspath, "S")
 | 
						|
 | 
						|
    def pytest_sessionstart(self, session):
 | 
						|
        self.write_sep("=", "test session starts", bold=True)
 | 
						|
        self._sessionstarttime = py.std.time.time()
 | 
						|
 | 
						|
        verinfo = ".".join(map(str, sys.version_info[:3]))
 | 
						|
        msg = "python: platform %s -- Python %s" % (sys.platform, verinfo)
 | 
						|
        if self.config.option.verbose or self.config.option.debug:
 | 
						|
            msg += " -- " + str(sys.executable)
 | 
						|
            msg += " -- pytest-%s" % (py.__version__)
 | 
						|
        self.write_line(msg)
 | 
						|
 | 
						|
        if self.config.option.debug or self.config.option.traceconfig:
 | 
						|
            rev = py.__pkg__.getrev()
 | 
						|
            self.write_line("using py lib: %s <rev %s>" % (
 | 
						|
                           py.path.local(py.__file__).dirpath(), rev))
 | 
						|
        if self.config.option.traceconfig:
 | 
						|
            plugins = []
 | 
						|
            for plugin in self.config.pluginmanager.comregistry:
 | 
						|
                name = plugin.__class__.__name__
 | 
						|
                if name.endswith("Plugin"):
 | 
						|
                    name = name[:-6]
 | 
						|
                    #if name == "Conftest":
 | 
						|
                    #    XXX get filename 
 | 
						|
                    plugins.append(name)
 | 
						|
                else:
 | 
						|
                    plugins.append(str(plugin))
 | 
						|
 | 
						|
            plugins = ", ".join(plugins) 
 | 
						|
            self.write_line("active plugins: %s" %(plugins,))
 | 
						|
        for i, testarg in py.builtin.enumerate(self.config.args):
 | 
						|
            self.write_line("test object %d: %s" %(i+1, testarg))
 | 
						|
 | 
						|
    def pytest_sessionfinish(self, __call__, session, exitstatus):
 | 
						|
        __call__.execute() 
 | 
						|
        self._tw.line("")
 | 
						|
        if exitstatus in (0, 1, 2):
 | 
						|
            self.summary_failures()
 | 
						|
            self.summary_skips()
 | 
						|
            self.config.hook.pytest_terminal_summary(terminalreporter=self)
 | 
						|
        if exitstatus == 2:
 | 
						|
            self._report_keyboardinterrupt()
 | 
						|
        self.summary_deselected()
 | 
						|
        self.summary_stats()
 | 
						|
 | 
						|
    def pytest_keyboard_interrupt(self, excinfo):
 | 
						|
        self._keyboardinterrupt_memo = excinfo.getrepr()
 | 
						|
 | 
						|
    def _report_keyboardinterrupt(self):
 | 
						|
        self.write_sep("!", "KEYBOARD INTERRUPT")
 | 
						|
        excrepr = self._keyboardinterrupt_memo
 | 
						|
        if self.config.option.verbose:
 | 
						|
            excrepr.toterminal(self._tw)
 | 
						|
        else:
 | 
						|
            excrepr.reprcrash.toterminal(self._tw)
 | 
						|
 | 
						|
    def pytest_looponfailinfo(self, failreports, rootdirs):
 | 
						|
        if failreports:
 | 
						|
            self.write_sep("#", "LOOPONFAILING", red=True)
 | 
						|
            for report in failreports:
 | 
						|
                try:
 | 
						|
                    loc = report.longrepr.reprcrash
 | 
						|
                except AttributeError:
 | 
						|
                    loc = str(report.longrepr)[:50]
 | 
						|
                self.write_line(loc, red=True)
 | 
						|
        self.write_sep("#", "waiting for changes")
 | 
						|
        for rootdir in rootdirs:
 | 
						|
            self.write_line("### Watching:   %s" %(rootdir,), bold=True)
 | 
						|
 | 
						|
    def _reportinfoline(self, item):
 | 
						|
        fspath, lineno, msg = self._getreportinfo(item)
 | 
						|
        if fspath:
 | 
						|
            fspath = self.curdir.bestrelpath(fspath)
 | 
						|
        if lineno is not None:
 | 
						|
            lineno += 1
 | 
						|
        if fspath and lineno and msg:
 | 
						|
            line = "%(fspath)s:%(lineno)s: %(msg)s"
 | 
						|
        elif fspath and msg:
 | 
						|
            line = "%(fspath)s: %(msg)s"
 | 
						|
        elif fspath and lineno:
 | 
						|
            line = "%(fspath)s:%(lineno)s"
 | 
						|
        else:
 | 
						|
            line = "[noreportinfo]"
 | 
						|
        return line % locals() + " "
 | 
						|
        
 | 
						|
    def _getfailureheadline(self, rep):
 | 
						|
        if hasattr(rep, "collector"):
 | 
						|
            return str(rep.collector.fspath)
 | 
						|
        else:
 | 
						|
            fspath, lineno, msg = self._getreportinfo(rep.item)
 | 
						|
            return msg
 | 
						|
 | 
						|
    def _getreportinfo(self, item):
 | 
						|
        try:
 | 
						|
            return item.__reportinfo
 | 
						|
        except AttributeError:
 | 
						|
            pass
 | 
						|
        reportinfo = item.config.hook.pytest_report_iteminfo(item=item)
 | 
						|
        # cache on item
 | 
						|
        item.__reportinfo = reportinfo
 | 
						|
        return reportinfo
 | 
						|
 | 
						|
    #
 | 
						|
    # summaries for sessionfinish 
 | 
						|
    #
 | 
						|
 | 
						|
    def summary_failures(self):
 | 
						|
        if 'failed' in self.stats and self.config.option.tbstyle != "no":
 | 
						|
            self.write_sep("=", "FAILURES")
 | 
						|
            for rep in self.stats['failed']:
 | 
						|
                msg = self._getfailureheadline(rep)
 | 
						|
                self.write_sep("_", msg)
 | 
						|
                if hasattr(rep, 'node'):
 | 
						|
                    self.write_line(self.gateway2info.get(
 | 
						|
                        rep.node.gateway, "node %r (platinfo not found? strange)")
 | 
						|
                            [:self._tw.fullwidth-1])
 | 
						|
                rep.toterminal(self._tw)
 | 
						|
 | 
						|
    def summary_stats(self):
 | 
						|
        session_duration = py.std.time.time() - self._sessionstarttime
 | 
						|
 | 
						|
        keys = "failed passed skipped deselected".split()
 | 
						|
        parts = []
 | 
						|
        for key in keys:
 | 
						|
            val = self.stats.get(key, None)
 | 
						|
            if val:
 | 
						|
                parts.append("%d %s" %(len(val), key))
 | 
						|
        line = ", ".join(parts)
 | 
						|
        # XXX coloring
 | 
						|
        self.write_sep("=", "%s in %.2f seconds" %(line, session_duration))
 | 
						|
 | 
						|
    def summary_deselected(self):
 | 
						|
        if 'deselected' in self.stats:
 | 
						|
            self.write_sep("=", "%d tests deselected by %r" %(
 | 
						|
                len(self.stats['deselected']), self.config.option.keyword), bold=True)
 | 
						|
 | 
						|
    def summary_skips(self):
 | 
						|
        if 'skipped' in self.stats:
 | 
						|
            if 'failed' not in self.stats: #  or self.config.option.showskipsummary:
 | 
						|
                fskips = folded_skips(self.stats['skipped'])
 | 
						|
                if fskips:
 | 
						|
                    self.write_sep("_", "skipped test summary")
 | 
						|
                    for num, fspath, lineno, reason in fskips:
 | 
						|
                        self._tw.line("%s:%d: [%d] %s" %(fspath, lineno, num, reason))
 | 
						|
 | 
						|
class CollectonlyReporter:
 | 
						|
    INDENT = "  "
 | 
						|
 | 
						|
    def __init__(self, config, out=None):
 | 
						|
        self.config = config 
 | 
						|
        if out is None:
 | 
						|
            out = py.std.sys.stdout
 | 
						|
        self.out = py.io.TerminalWriter(out)
 | 
						|
        self.indent = ""
 | 
						|
        self._failed = []
 | 
						|
 | 
						|
    def outindent(self, line):
 | 
						|
        self.out.line(self.indent + str(line))
 | 
						|
 | 
						|
    def pytest_internalerror(self, excrepr):
 | 
						|
        for line in str(excrepr).split("\n"):
 | 
						|
            self.out.line("INTERNALERROR> " + line)
 | 
						|
 | 
						|
    def pytest_collectstart(self, collector):
 | 
						|
        self.outindent(collector)
 | 
						|
        self.indent += self.INDENT 
 | 
						|
    
 | 
						|
    def pytest_itemstart(self, item, node=None):
 | 
						|
        self.outindent(item)
 | 
						|
 | 
						|
    def pytest_collectreport(self, rep):
 | 
						|
        if not rep.passed:
 | 
						|
            self.outindent("!!! %s !!!" % rep.longrepr.reprcrash.message)
 | 
						|
            self._failed.append(rep)
 | 
						|
        self.indent = self.indent[:-len(self.INDENT)]
 | 
						|
 | 
						|
    def pytest_sessionfinish(self, session, exitstatus):
 | 
						|
        if self._failed:
 | 
						|
            self.out.sep("!", "collection failures")
 | 
						|
        for rep in self._failed:
 | 
						|
            rep.toterminal(self.out)
 | 
						|
                
 | 
						|
def folded_skips(skipped):
 | 
						|
    d = {}
 | 
						|
    for event in skipped:
 | 
						|
        entry = event.longrepr.reprcrash 
 | 
						|
        key = entry.path, entry.lineno, entry.message
 | 
						|
        d.setdefault(key, []).append(event)
 | 
						|
    l = []
 | 
						|
    for key, events in d.iteritems(): 
 | 
						|
        l.append((len(events),) + key)
 | 
						|
    return l 
 | 
						|
 | 
						|
def repr_pythonversion(v=None):
 | 
						|
    if v is None:
 | 
						|
        v = sys.version_info
 | 
						|
    try:
 | 
						|
        return "%s.%s.%s-%s-%s" % v
 | 
						|
    except (TypeError, ValueError):
 | 
						|
        return str(v)
 | 
						|
 |