majorly refactor collection process
- get rid of py.test.collect.Directory alltogether. - introduce direct node.nodeid attribute - remove now superflous attributes on collect and test reports
This commit is contained in:
@@ -133,7 +133,11 @@ class CaptureManager:
|
||||
|
||||
def pytest_make_collect_report(self, __multicall__, collector):
|
||||
method = self._getmethod(collector.config, collector.fspath)
|
||||
self.resumecapture(method)
|
||||
try:
|
||||
self.resumecapture(method)
|
||||
except ValueError:
|
||||
return # recursive collect, XXX refactor capturing
|
||||
# to allow for more lightweight recursive capturing
|
||||
try:
|
||||
rep = __multicall__.execute()
|
||||
finally:
|
||||
|
||||
@@ -255,7 +255,7 @@ class Config(object):
|
||||
)
|
||||
#: a pluginmanager instance
|
||||
self.pluginmanager = pluginmanager or PluginManager(load=True)
|
||||
self.trace = self.pluginmanager.trace.get("config")
|
||||
self.trace = self.pluginmanager.trace.root.get("config")
|
||||
self._conftest = Conftest(onimport=self._onimportconftest)
|
||||
self.hook = self.pluginmanager.hook
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ def pytest_addoption(parser):
|
||||
def pytest_collect_file(path, parent):
|
||||
config = parent.config
|
||||
if path.ext == ".py":
|
||||
if config.getvalue("doctestmodules"):
|
||||
if config.option.doctestmodules:
|
||||
return DoctestModule(path, parent)
|
||||
elif path.check(fnmatch=config.getvalue("doctestglob")):
|
||||
return DoctestTextfile(path, parent)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
"""
|
||||
|
||||
import py
|
||||
import os
|
||||
import time
|
||||
|
||||
def pytest_addoption(parser):
|
||||
@@ -36,7 +37,9 @@ class LogXML(object):
|
||||
self._durations = {}
|
||||
|
||||
def _opentestcase(self, report):
|
||||
names = report.nodenames
|
||||
names = report.nodeid.split("::")
|
||||
names[0] = names[0].replace(os.sep, '.')
|
||||
names = tuple(names)
|
||||
d = {'time': self._durations.pop(names, "0")}
|
||||
names = [x.replace(".py", "") for x in names if x != "()"]
|
||||
classnames = names[:-1]
|
||||
|
||||
@@ -105,6 +105,7 @@ class HookRecorder:
|
||||
return l
|
||||
|
||||
def contains(self, entries):
|
||||
__tracebackhide__ = True
|
||||
from py.builtin import print_
|
||||
i = 0
|
||||
entries = list(entries)
|
||||
@@ -123,8 +124,7 @@ class HookRecorder:
|
||||
break
|
||||
print_("NONAMEMATCH", name, "with", call)
|
||||
else:
|
||||
raise AssertionError("could not find %r in %r" %(
|
||||
name, self.calls[i:]))
|
||||
py.test.fail("could not find %r check %r" % (name, check))
|
||||
|
||||
def popcall(self, name):
|
||||
for i, call in enumerate(self.calls):
|
||||
@@ -278,7 +278,16 @@ class TmpTestdir:
|
||||
Collection = Collection
|
||||
def getnode(self, config, arg):
|
||||
collection = Collection(config)
|
||||
return collection.getbyid(collection._normalizearg(arg))[0]
|
||||
assert '::' not in str(arg)
|
||||
p = py.path.local(arg)
|
||||
x = collection.fspath.bestrelpath(p)
|
||||
return collection.perform_collect([x], genitems=False)[0]
|
||||
|
||||
def getpathnode(self, path):
|
||||
config = self.parseconfig(path)
|
||||
collection = Collection(config)
|
||||
x = collection.fspath.bestrelpath(path)
|
||||
return collection.perform_collect([x], genitems=False)[0]
|
||||
|
||||
def genitems(self, colitems):
|
||||
collection = colitems[0].collection
|
||||
@@ -291,8 +300,9 @@ class TmpTestdir:
|
||||
#config = self.parseconfig(*args)
|
||||
config = self.parseconfigure(*args)
|
||||
rec = self.getreportrecorder(config)
|
||||
items = Collection(config).perform_collect()
|
||||
return items, rec
|
||||
collection = Collection(config)
|
||||
collection.perform_collect()
|
||||
return collection.items, rec
|
||||
|
||||
def runitem(self, source):
|
||||
# used from runner functional tests
|
||||
@@ -469,11 +479,12 @@ class TmpTestdir:
|
||||
p = py.path.local.make_numbered_dir(prefix="runpytest-",
|
||||
keep=None, rootdir=self.tmpdir)
|
||||
args = ('--basetemp=%s' % p, ) + args
|
||||
for x in args:
|
||||
if '--confcutdir' in str(x):
|
||||
break
|
||||
else:
|
||||
args = ('--confcutdir=.',) + args
|
||||
#for x in args:
|
||||
# if '--confcutdir' in str(x):
|
||||
# break
|
||||
#else:
|
||||
# pass
|
||||
# args = ('--confcutdir=.',) + args
|
||||
plugins = [x for x in self.plugins if isinstance(x, str)]
|
||||
if plugins:
|
||||
args = ('-p', plugins[0]) + args
|
||||
@@ -530,7 +541,7 @@ class ReportRecorder(object):
|
||||
""" return a testreport whose dotted import path matches """
|
||||
l = []
|
||||
for rep in self.getreports(names=names):
|
||||
if not inamepart or inamepart in rep.nodenames:
|
||||
if not inamepart or inamepart in rep.nodeid.split("::"):
|
||||
l.append(rep)
|
||||
if not l:
|
||||
raise ValueError("could not find test report matching %r: no test reports at all!" %
|
||||
@@ -616,6 +627,8 @@ class LineMatcher:
|
||||
raise ValueError("line %r not found in output" % line)
|
||||
|
||||
def fnmatch_lines(self, lines2):
|
||||
def show(arg1, arg2):
|
||||
py.builtin.print_(arg1, arg2, file=py.std.sys.stderr)
|
||||
lines2 = self._getlines(lines2)
|
||||
lines1 = self.lines[:]
|
||||
nextline = None
|
||||
@@ -626,17 +639,17 @@ class LineMatcher:
|
||||
while lines1:
|
||||
nextline = lines1.pop(0)
|
||||
if line == nextline:
|
||||
print_("exact match:", repr(line))
|
||||
show("exact match:", repr(line))
|
||||
break
|
||||
elif fnmatch(nextline, line):
|
||||
print_("fnmatch:", repr(line))
|
||||
print_(" with:", repr(nextline))
|
||||
show("fnmatch:", repr(line))
|
||||
show(" with:", repr(nextline))
|
||||
break
|
||||
else:
|
||||
if not nomatchprinted:
|
||||
print_("nomatch:", repr(line))
|
||||
show("nomatch:", repr(line))
|
||||
nomatchprinted = True
|
||||
print_(" and:", repr(nextline))
|
||||
show(" and:", repr(nextline))
|
||||
extralines.append(nextline)
|
||||
else:
|
||||
assert line == nextline
|
||||
py.test.fail("remains unmatched: %r, see stderr" % (line,))
|
||||
|
||||
@@ -48,11 +48,10 @@ def pytest_pyfunc_call(__multicall__, pyfuncitem):
|
||||
def pytest_collect_file(path, parent):
|
||||
ext = path.ext
|
||||
pb = path.purebasename
|
||||
if pb.startswith("test_") or pb.endswith("_test") or \
|
||||
path in parent.collection._argfspaths:
|
||||
if ext == ".py":
|
||||
return parent.ihook.pytest_pycollect_makemodule(
|
||||
path=path, parent=parent)
|
||||
if ext == ".py" and (pb.startswith("test_") or pb.endswith("_test") or
|
||||
parent.collection.isinitpath(path)):
|
||||
return parent.ihook.pytest_pycollect_makemodule(
|
||||
path=path, parent=parent)
|
||||
|
||||
def pytest_pycollect_makemodule(path, parent):
|
||||
return Module(path, parent)
|
||||
@@ -713,11 +712,13 @@ class FuncargRequest:
|
||||
def showfuncargs(config):
|
||||
from pytest.plugin.session import Collection
|
||||
collection = Collection(config)
|
||||
firstid = collection._normalizearg(config.args[0])
|
||||
colitem = collection.getbyid(firstid)[0]
|
||||
collection.perform_collect()
|
||||
if collection.items:
|
||||
plugins = getplugins(collection.items[0])
|
||||
else:
|
||||
plugins = getplugins(collection)
|
||||
curdir = py.path.local()
|
||||
tw = py.io.TerminalWriter()
|
||||
plugins = getplugins(colitem, withpy=True)
|
||||
verbose = config.getvalue("verbose")
|
||||
for plugin in plugins:
|
||||
available = []
|
||||
|
||||
@@ -29,30 +29,12 @@ def pytest_sessionfinish(session, exitstatus):
|
||||
session.exitstatus = 1
|
||||
|
||||
class NodeInfo:
|
||||
def __init__(self, nodeid, nodenames, fspath, location):
|
||||
self.nodeid = nodeid
|
||||
self.nodenames = nodenames
|
||||
self.fspath = fspath
|
||||
def __init__(self, location):
|
||||
self.location = location
|
||||
|
||||
def getitemnodeinfo(item):
|
||||
try:
|
||||
return item._nodeinfo
|
||||
except AttributeError:
|
||||
location = item.reportinfo()
|
||||
location = (str(location[0]), location[1], str(location[2]))
|
||||
nodenames = tuple(item.listnames())
|
||||
nodeid = item.collection.getid(item)
|
||||
fspath = item.fspath
|
||||
item._nodeinfo = n = NodeInfo(nodeid, nodenames, fspath, location)
|
||||
return n
|
||||
|
||||
def pytest_runtest_protocol(item):
|
||||
nodeinfo = getitemnodeinfo(item)
|
||||
item.ihook.pytest_runtest_logstart(
|
||||
nodeid=nodeinfo.nodeid,
|
||||
location=nodeinfo.location,
|
||||
fspath=str(item.fspath),
|
||||
nodeid=item.nodeid, location=item.location,
|
||||
)
|
||||
runtestprotocol(item)
|
||||
return True
|
||||
@@ -142,16 +124,18 @@ class BaseReport(object):
|
||||
failed = property(lambda x: x.outcome == "failed")
|
||||
skipped = property(lambda x: x.outcome == "skipped")
|
||||
|
||||
@property
|
||||
def fspath(self):
|
||||
return self.nodeid.split("::")[0]
|
||||
|
||||
def pytest_runtest_makereport(item, call):
|
||||
nodeinfo = getitemnodeinfo(item)
|
||||
when = call.when
|
||||
keywords = dict([(x,1) for x in item.keywords])
|
||||
excinfo = call.excinfo
|
||||
if not call.excinfo:
|
||||
outcome = "passed"
|
||||
longrepr = None
|
||||
else:
|
||||
excinfo = call.excinfo
|
||||
if not isinstance(excinfo, py.code.ExceptionInfo):
|
||||
outcome = "failed"
|
||||
longrepr = excinfo
|
||||
@@ -164,25 +148,18 @@ def pytest_runtest_makereport(item, call):
|
||||
longrepr = item.repr_failure(excinfo)
|
||||
else: # exception in setup or teardown
|
||||
longrepr = item._repr_failure_py(excinfo)
|
||||
return TestReport(nodeinfo.nodeid, nodeinfo.nodenames,
|
||||
nodeinfo.fspath, nodeinfo.location,
|
||||
return TestReport(item.nodeid, item.location,
|
||||
keywords, outcome, longrepr, when)
|
||||
|
||||
class TestReport(BaseReport):
|
||||
""" Basic test report object (also used for setup and teardown calls if
|
||||
they fail).
|
||||
"""
|
||||
def __init__(self, nodeid, nodenames, fspath, location,
|
||||
def __init__(self, nodeid, location,
|
||||
keywords, outcome, longrepr, when):
|
||||
#: normalized collection node id
|
||||
self.nodeid = nodeid
|
||||
|
||||
#: list of names indicating position in collection tree.
|
||||
self.nodenames = nodenames
|
||||
|
||||
#: the collected path of the file containing the test.
|
||||
self.fspath = fspath # where the test was collected
|
||||
|
||||
#: a (filesystempath, lineno, domaininfo) tuple indicating the
|
||||
#: actual location of a test item - it might be different from the
|
||||
#: collected one e.g. if a method is inherited from a different module.
|
||||
@@ -212,39 +189,27 @@ class TeardownErrorReport(BaseReport):
|
||||
self.longrepr = longrepr
|
||||
|
||||
def pytest_make_collect_report(collector):
|
||||
result = excinfo = None
|
||||
try:
|
||||
result = collector._memocollect()
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
excinfo = py.code.ExceptionInfo()
|
||||
nodenames = tuple(collector.listnames())
|
||||
nodeid = collector.collection.getid(collector)
|
||||
fspath = str(collector.fspath)
|
||||
call = CallInfo(collector._memocollect, "memocollect")
|
||||
reason = longrepr = None
|
||||
if not excinfo:
|
||||
if not call.excinfo:
|
||||
outcome = "passed"
|
||||
else:
|
||||
if excinfo.errisinstance(py.test.skip.Exception):
|
||||
if call.excinfo.errisinstance(py.test.skip.Exception):
|
||||
outcome = "skipped"
|
||||
reason = str(excinfo.value)
|
||||
longrepr = collector._repr_failure_py(excinfo, "line")
|
||||
reason = str(call.excinfo.value)
|
||||
longrepr = collector._repr_failure_py(call.excinfo, "line")
|
||||
else:
|
||||
outcome = "failed"
|
||||
errorinfo = collector.repr_failure(excinfo)
|
||||
errorinfo = collector.repr_failure(call.excinfo)
|
||||
if not hasattr(errorinfo, "toterminal"):
|
||||
errorinfo = CollectErrorRepr(errorinfo)
|
||||
longrepr = errorinfo
|
||||
return CollectReport(nodenames, nodeid, fspath,
|
||||
outcome, longrepr, result, reason)
|
||||
return CollectReport(collector.nodeid, outcome, longrepr,
|
||||
getattr(call, 'result', None), reason)
|
||||
|
||||
class CollectReport(BaseReport):
|
||||
def __init__(self, nodenames, nodeid, fspath, outcome,
|
||||
longrepr, result, reason):
|
||||
self.nodenames = nodenames
|
||||
def __init__(self, nodeid, outcome, longrepr, result, reason):
|
||||
self.nodeid = nodeid
|
||||
self.fspath = fspath
|
||||
self.outcome = outcome
|
||||
self.longrepr = longrepr
|
||||
self.result = result or []
|
||||
@@ -255,7 +220,8 @@ class CollectReport(BaseReport):
|
||||
return (self.fspath, None, self.fspath)
|
||||
|
||||
def __repr__(self):
|
||||
return "<CollectReport %r outcome=%r>" % (self.nodeid, self.outcome)
|
||||
return "<CollectReport %r lenresult=%s outcome=%r>" % (
|
||||
self.nodeid, len(self.result), self.outcome)
|
||||
|
||||
class CollectErrorRepr(TerminalRepr):
|
||||
def __init__(self, msg):
|
||||
|
||||
@@ -14,11 +14,15 @@ EXIT_OK = 0
|
||||
EXIT_TESTSFAILED = 1
|
||||
EXIT_INTERRUPTED = 2
|
||||
EXIT_INTERNALERROR = 3
|
||||
EXIT_NOHOSTS = 4
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addini("norecursedirs", "directory patterns to avoid for recursion",
|
||||
type="args", default=('.*', 'CVS', '_darcs', '{arch}'))
|
||||
#parser.addini("dirpatterns",
|
||||
# "patterns specifying possible locations of test files",
|
||||
# type="linelist", default=["**/test_*.txt",
|
||||
# "**/test_*.py", "**/*_test.py"]
|
||||
#)
|
||||
group = parser.getgroup("general", "running and selection options")
|
||||
group._addoption('-x', '--exitfirst', action="store_true", default=False,
|
||||
dest="exitfirst",
|
||||
@@ -44,12 +48,11 @@ def pytest_addoption(parser):
|
||||
|
||||
|
||||
def pytest_namespace():
|
||||
return dict(collect=dict(Item=Item, Collector=Collector,
|
||||
File=File, Directory=Directory))
|
||||
return dict(collect=dict(Item=Item, Collector=Collector, File=File))
|
||||
|
||||
def pytest_configure(config):
|
||||
py.test.config = config # compatibiltiy
|
||||
if config.getvalue("exitfirst"):
|
||||
if config.option.exitfirst:
|
||||
config.option.maxfail = 1
|
||||
|
||||
def pytest_cmdline_main(config):
|
||||
@@ -84,8 +87,10 @@ def pytest_cmdline_main(config):
|
||||
def pytest_collection(session):
|
||||
collection = session.collection
|
||||
assert not hasattr(collection, 'items')
|
||||
|
||||
collection.perform_collect()
|
||||
hook = session.config.hook
|
||||
collection.items = items = collection.perform_collect()
|
||||
items = collection.items
|
||||
hook.pytest_collection_modifyitems(config=session.config, items=items)
|
||||
hook.pytest_collection_finish(collection=collection)
|
||||
return True
|
||||
@@ -108,18 +113,6 @@ def pytest_ignore_collect(path, config):
|
||||
ignore_paths.extend([py.path.local(x) for x in excludeopt])
|
||||
return path in ignore_paths
|
||||
|
||||
def pytest_collect_directory(path, parent):
|
||||
# check if cmdline specified this dir or a subdir directly
|
||||
for arg in parent.collection._argfspaths:
|
||||
if path == arg or arg.relto(path):
|
||||
break
|
||||
else:
|
||||
patterns = parent.config.getini("norecursedirs")
|
||||
for pat in patterns or []:
|
||||
if path.check(fnmatch=pat):
|
||||
return
|
||||
return Directory(path, parent=parent)
|
||||
|
||||
class Session(object):
|
||||
class Interrupted(KeyboardInterrupt):
|
||||
""" signals an interrupted test run. """
|
||||
@@ -145,160 +138,17 @@ class Session(object):
|
||||
self._testsfailed)
|
||||
pytest_collectreport = pytest_runtest_logreport
|
||||
|
||||
class Collection:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.topdir = gettopdir(self.config.args)
|
||||
self._argfspaths = [py.path.local(decodearg(x)[0])
|
||||
for x in self.config.args]
|
||||
x = pytest.collect.Directory(fspath=self.topdir,
|
||||
config=config, collection=self)
|
||||
self._topcollector = x.consider_dir(self.topdir)
|
||||
self._topcollector.parent = None
|
||||
|
||||
def _normalizearg(self, arg):
|
||||
return "::".join(self._parsearg(arg))
|
||||
|
||||
def _parsearg(self, arg, base=None):
|
||||
""" return normalized name list for a command line specified id
|
||||
which might be of the form x/y/z::name1::name2
|
||||
and should result into the form x::y::z::name1::name2
|
||||
"""
|
||||
if base is None:
|
||||
base = py.path.local()
|
||||
parts = str(arg).split("::")
|
||||
path = base.join(parts[0], abs=True)
|
||||
if not path.check():
|
||||
raise pytest.UsageError("file not found: %s" %(path,))
|
||||
topdir = self.topdir
|
||||
if path != topdir and not path.relto(topdir):
|
||||
raise pytest.UsageError("path %r is not relative to %r" %
|
||||
(str(path), str(topdir)))
|
||||
topparts = path.relto(topdir).split(path.sep)
|
||||
return topparts + parts[1:]
|
||||
|
||||
def getid(self, node):
|
||||
""" return id for node, relative to topdir. """
|
||||
path = node.fspath
|
||||
chain = [x for x in node.listchain() if x.fspath == path]
|
||||
chain = chain[1:]
|
||||
names = [x.name for x in chain if x.name != "()"]
|
||||
relpath = path.relto(self.topdir)
|
||||
if not relpath:
|
||||
assert path == self.topdir
|
||||
path = ''
|
||||
else:
|
||||
path = relpath
|
||||
if os.sep != "/":
|
||||
path = str(path).replace(os.sep, "/")
|
||||
names.insert(0, path)
|
||||
return "::".join(names)
|
||||
|
||||
def getbyid(self, id):
|
||||
""" return one or more nodes matching the id. """
|
||||
names = [x for x in id.split("::") if x]
|
||||
if names and '/' in names[0]:
|
||||
names[:1] = names[0].split("/")
|
||||
return list(self.matchnodes([self._topcollector], names))
|
||||
|
||||
def perform_collect(self):
|
||||
items = []
|
||||
for arg in self.config.args:
|
||||
names = self._parsearg(arg)
|
||||
try:
|
||||
for node in self.matchnodes([self._topcollector], names):
|
||||
items.extend(self.genitems(node))
|
||||
except NoMatch:
|
||||
raise pytest.UsageError("can't collect: %s" % (arg,))
|
||||
return items
|
||||
|
||||
def matchnodes(self, matching, names):
|
||||
if not matching:
|
||||
return
|
||||
if not names:
|
||||
for x in matching:
|
||||
yield x
|
||||
return
|
||||
name = names[0]
|
||||
names = names[1:]
|
||||
for node in matching:
|
||||
if isinstance(node, pytest.collect.Item):
|
||||
if not name:
|
||||
yield node
|
||||
continue
|
||||
assert isinstance(node, pytest.collect.Collector)
|
||||
node.ihook.pytest_collectstart(collector=node)
|
||||
rep = node.ihook.pytest_make_collect_report(collector=node)
|
||||
#print "matching", rep.result, "against name", name
|
||||
if rep.passed:
|
||||
if not name:
|
||||
for x in rep.result:
|
||||
yield x
|
||||
else:
|
||||
matched = False
|
||||
for x in rep.result:
|
||||
try:
|
||||
if x.name == name or x.fspath.basename == name:
|
||||
for x in self.matchnodes([x], names):
|
||||
yield x
|
||||
matched = True
|
||||
elif x.name == "()": # XXX special Instance() case
|
||||
for x in self.matchnodes([x], [name] + names):
|
||||
yield x
|
||||
matched = True
|
||||
except NoMatch:
|
||||
pass
|
||||
if not matched:
|
||||
node.ihook.pytest_collectreport(report=rep)
|
||||
raise NoMatch(name)
|
||||
node.ihook.pytest_collectreport(report=rep)
|
||||
|
||||
def genitems(self, node):
|
||||
if isinstance(node, pytest.collect.Item):
|
||||
node.ihook.pytest_itemcollected(item=node)
|
||||
yield node
|
||||
else:
|
||||
assert isinstance(node, pytest.collect.Collector)
|
||||
node.ihook.pytest_collectstart(collector=node)
|
||||
rep = node.ihook.pytest_make_collect_report(collector=node)
|
||||
if rep.passed:
|
||||
for subnode in rep.result:
|
||||
for x in self.genitems(subnode):
|
||||
yield x
|
||||
node.ihook.pytest_collectreport(report=rep)
|
||||
|
||||
class NoMatch(Exception):
|
||||
""" raised if matching cannot locate a matching names. """
|
||||
|
||||
def gettopdir(args):
|
||||
""" return the top directory for the given paths.
|
||||
if the common base dir resides in a python package
|
||||
parent directory of the root package is returned.
|
||||
"""
|
||||
fsargs = [py.path.local(decodearg(arg)[0]) for arg in args]
|
||||
p = fsargs and fsargs[0] or None
|
||||
for x in fsargs[1:]:
|
||||
p = p.common(x)
|
||||
assert p, "cannot determine common basedir of %s" %(fsargs,)
|
||||
pkgdir = p.pypkgpath()
|
||||
if pkgdir is None:
|
||||
if p.check(file=1):
|
||||
p = p.dirpath()
|
||||
return p
|
||||
else:
|
||||
return pkgdir.dirpath()
|
||||
|
||||
def decodearg(arg):
|
||||
arg = str(arg)
|
||||
return arg.split("::")
|
||||
|
||||
class HookProxy:
|
||||
def __init__(self, node):
|
||||
self.node = node
|
||||
def __init__(self, fspath, config):
|
||||
self.fspath = fspath
|
||||
self.config = config
|
||||
def __getattr__(self, name):
|
||||
hookmethod = getattr(self.node.config.hook, name)
|
||||
hookmethod = getattr(self.config.hook, name)
|
||||
def call_matching_hooks(**kwargs):
|
||||
plugins = self.node.config._getmatchingplugins(self.node.fspath)
|
||||
plugins = self.config._getmatchingplugins(self.fspath)
|
||||
return hookmethod.pcall(plugins, **kwargs)
|
||||
return call_matching_hooks
|
||||
|
||||
@@ -329,7 +179,7 @@ class Node(object):
|
||||
|
||||
#: the file where this item is contained/collected from.
|
||||
self.fspath = getattr(parent, 'fspath', None)
|
||||
self.ihook = HookProxy(self)
|
||||
self.ihook = self.collection.gethookproxy(self.fspath)
|
||||
self.keywords = {self.name: True}
|
||||
|
||||
Module = compatproperty("Module")
|
||||
@@ -339,14 +189,19 @@ class Node(object):
|
||||
Item = compatproperty("Item")
|
||||
|
||||
def __repr__(self):
|
||||
if getattr(self.config.option, 'debug', False):
|
||||
return "<%s %r %0x>" %(self.__class__.__name__,
|
||||
getattr(self, 'name', None), id(self))
|
||||
else:
|
||||
return "<%s %r>" %(self.__class__.__name__,
|
||||
getattr(self, 'name', None))
|
||||
return "<%s %r>" %(self.__class__.__name__, getattr(self, 'name', None))
|
||||
|
||||
# methods for ordering nodes
|
||||
@property
|
||||
def nodeid(self):
|
||||
try:
|
||||
return self._nodeid
|
||||
except AttributeError:
|
||||
self._nodeid = x = self._makeid()
|
||||
return x
|
||||
|
||||
def _makeid(self):
|
||||
return self.parent.nodeid + "::" + self.name
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, Node):
|
||||
@@ -447,7 +302,7 @@ class Collector(Node):
|
||||
|
||||
def _memocollect(self):
|
||||
""" internal helper method to cache results of calling collect(). """
|
||||
return self._memoizedcall('_collected', self.collect)
|
||||
return self._memoizedcall('_collected', lambda: list(self.collect()))
|
||||
|
||||
def _prunetraceback(self, excinfo):
|
||||
if hasattr(self, 'fspath'):
|
||||
@@ -460,55 +315,177 @@ class Collector(Node):
|
||||
|
||||
class FSCollector(Collector):
|
||||
def __init__(self, fspath, parent=None, config=None, collection=None):
|
||||
fspath = py.path.local(fspath)
|
||||
super(FSCollector, self).__init__(fspath.basename,
|
||||
parent, config, collection)
|
||||
fspath = py.path.local(fspath) # xxx only for test_resultlog.py?
|
||||
name = parent and fspath.relto(parent.fspath) or fspath.basename
|
||||
super(FSCollector, self).__init__(name, parent, config, collection)
|
||||
self.fspath = fspath
|
||||
|
||||
def _makeid(self):
|
||||
if self == self.collection:
|
||||
return "."
|
||||
relpath = self.collection.fspath.bestrelpath(self.fspath)
|
||||
if os.sep != "/":
|
||||
relpath = str(path).replace(os.sep, "/")
|
||||
return relpath
|
||||
|
||||
class File(FSCollector):
|
||||
""" base class for collecting tests from a file. """
|
||||
|
||||
class Directory(FSCollector):
|
||||
def collect(self):
|
||||
l = []
|
||||
for path in self.fspath.listdir(sort=True):
|
||||
res = self.consider(path)
|
||||
if res is not None:
|
||||
if isinstance(res, (list, tuple)):
|
||||
l.extend(res)
|
||||
else:
|
||||
l.append(res)
|
||||
return l
|
||||
|
||||
def consider(self, path):
|
||||
if self.ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return
|
||||
if path.check(file=1):
|
||||
res = self.consider_file(path)
|
||||
elif path.check(dir=1):
|
||||
res = self.consider_dir(path)
|
||||
else:
|
||||
res = None
|
||||
if isinstance(res, list):
|
||||
# throw out identical results
|
||||
l = []
|
||||
for x in res:
|
||||
if x not in l:
|
||||
assert x.parent == self, (x.parent, self)
|
||||
assert x.fspath == path, (x.fspath, path)
|
||||
l.append(x)
|
||||
res = l
|
||||
return res
|
||||
|
||||
def consider_file(self, path):
|
||||
return self.ihook.pytest_collect_file(path=path, parent=self)
|
||||
|
||||
def consider_dir(self, path):
|
||||
return self.ihook.pytest_collect_directory(path=path, parent=self)
|
||||
|
||||
class Item(Node):
|
||||
""" a basic test invocation item. Note that for a single function
|
||||
there might be multiple test invocation items.
|
||||
"""
|
||||
def reportinfo(self):
|
||||
return self.fspath, None, ""
|
||||
|
||||
@property
|
||||
def location(self):
|
||||
try:
|
||||
return self._location
|
||||
except AttributeError:
|
||||
location = self.reportinfo()
|
||||
location = (str(location[0]), location[1], str(location[2]))
|
||||
self._location = location
|
||||
return location
|
||||
|
||||
class Collection(FSCollector):
|
||||
def __init__(self, config):
|
||||
super(Collection, self).__init__(py.path.local(), parent=None,
|
||||
config=config, collection=self)
|
||||
self.trace = config.trace.root.get("collection")
|
||||
self._norecursepatterns = config.getini("norecursedirs")
|
||||
|
||||
def isinitpath(self, path):
|
||||
return path in self._initialpaths
|
||||
|
||||
def gethookproxy(self, fspath):
|
||||
return HookProxy(fspath, self.config)
|
||||
|
||||
def perform_collect(self, args=None, genitems=True):
|
||||
if args is None:
|
||||
args = self.config.args
|
||||
self.trace("perform_collect", self, args)
|
||||
self.trace.root.indent += 1
|
||||
self._notfound = []
|
||||
self._initialpaths = set()
|
||||
self._initialargs = args
|
||||
for arg in args:
|
||||
parts = self._parsearg(arg)
|
||||
self._initialpaths.add(parts[0])
|
||||
self.ihook.pytest_collectstart(collector=self)
|
||||
rep = self.ihook.pytest_make_collect_report(collector=self)
|
||||
self.ihook.pytest_collectreport(report=rep)
|
||||
self.trace.root.indent -= 1
|
||||
if self._notfound:
|
||||
for arg, exc in self._notfound:
|
||||
line = "no name %r in any of %r" % (exc.args[1], exc.args[0])
|
||||
raise pytest.UsageError("not found: %s\n%s" %(arg, line))
|
||||
if not genitems:
|
||||
return rep.result
|
||||
else:
|
||||
self.items = items = []
|
||||
if rep.passed:
|
||||
for node in rep.result:
|
||||
self.items.extend(self.genitems(node))
|
||||
return items
|
||||
|
||||
def collect(self):
|
||||
for arg in self._initialargs:
|
||||
self.trace("processing arg", arg)
|
||||
self.trace.root.indent += 1
|
||||
try:
|
||||
for x in self._collect(arg):
|
||||
yield x
|
||||
except NoMatch:
|
||||
# we are inside a make_report hook so
|
||||
# we cannot directly pass through the exception
|
||||
self._notfound.append((arg, sys.exc_info()[1]))
|
||||
self.trace.root.indent -= 1
|
||||
break
|
||||
self.trace.root.indent -= 1
|
||||
|
||||
def _collect(self, arg):
|
||||
names = self._parsearg(arg)
|
||||
path = names.pop(0)
|
||||
if path.check(dir=1):
|
||||
assert not names, "invalid arg %r" %(arg,)
|
||||
for path in path.visit(rec=self._recurse, bf=True, sort=True):
|
||||
for x in self._collectfile(path):
|
||||
yield x
|
||||
else:
|
||||
assert path.check(file=1)
|
||||
for x in self.matchnodes(self._collectfile(path), names):
|
||||
yield x
|
||||
|
||||
def _collectfile(self, path):
|
||||
ihook = self.gethookproxy(path)
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return ()
|
||||
return ihook.pytest_collect_file(path=path, parent=self)
|
||||
|
||||
def _recurse(self, path):
|
||||
ihook = self.gethookproxy(path)
|
||||
if ihook.pytest_ignore_collect(path=path, config=self.config):
|
||||
return
|
||||
for pat in self._norecursepatterns:
|
||||
if path.check(fnmatch=pat):
|
||||
return False
|
||||
ihook.pytest_collect_directory(path=path, parent=self)
|
||||
return True
|
||||
|
||||
def _parsearg(self, arg):
|
||||
""" return (fspath, names) tuple after checking the file exists. """
|
||||
parts = str(arg).split("::")
|
||||
path = self.fspath.join(parts[0], abs=True)
|
||||
if not path.check():
|
||||
raise pytest.UsageError("file not found: %s" %(path,))
|
||||
parts[0] = path
|
||||
return parts
|
||||
|
||||
def matchnodes(self, matching, names):
|
||||
self.trace("matchnodes", matching, names)
|
||||
self.trace.root.indent += 1
|
||||
nodes = self._matchnodes(matching, names)
|
||||
num = len(nodes)
|
||||
self.trace("matchnodes finished -> ", num, "nodes")
|
||||
self.trace.root.indent -= 1
|
||||
if num == 0:
|
||||
raise NoMatch(matching, names[:1])
|
||||
return nodes
|
||||
|
||||
def _matchnodes(self, matching, names):
|
||||
if not matching or not names:
|
||||
return matching
|
||||
name = names[0]
|
||||
assert name
|
||||
nextnames = names[1:]
|
||||
resultnodes = []
|
||||
for node in matching:
|
||||
if isinstance(node, pytest.collect.Item):
|
||||
resultnodes.append(node)
|
||||
continue
|
||||
assert isinstance(node, pytest.collect.Collector)
|
||||
node.ihook.pytest_collectstart(collector=node)
|
||||
rep = node.ihook.pytest_make_collect_report(collector=node)
|
||||
if rep.passed:
|
||||
for x in rep.result:
|
||||
if x.name == name:
|
||||
resultnodes.extend(self.matchnodes([x], nextnames))
|
||||
node.ihook.pytest_collectreport(report=rep)
|
||||
return resultnodes
|
||||
|
||||
def genitems(self, node):
|
||||
self.trace("genitems", node)
|
||||
if isinstance(node, pytest.collect.Item):
|
||||
node.ihook.pytest_itemcollected(item=node)
|
||||
yield node
|
||||
else:
|
||||
assert isinstance(node, pytest.collect.Collector)
|
||||
node.ihook.pytest_collectstart(collector=node)
|
||||
rep = node.ihook.pytest_make_collect_report(collector=node)
|
||||
if rep.passed:
|
||||
for subnode in rep.result:
|
||||
for x in self.genitems(subnode):
|
||||
yield x
|
||||
node.ihook.pytest_collectreport(report=rep)
|
||||
|
||||
|
||||
@@ -115,10 +115,10 @@ class TerminalReporter:
|
||||
def write_fspath_result(self, fspath, res):
|
||||
if fspath != self.currentfspath:
|
||||
self.currentfspath = fspath
|
||||
fspath = self.curdir.bestrelpath(fspath)
|
||||
#fspath = self.curdir.bestrelpath(fspath)
|
||||
self._tw.line()
|
||||
relpath = self.curdir.bestrelpath(fspath)
|
||||
self._tw.write(relpath + " ")
|
||||
#relpath = self.curdir.bestrelpath(fspath)
|
||||
self._tw.write(fspath + " ")
|
||||
self._tw.write(res)
|
||||
|
||||
def write_ensure_prefix(self, prefix, extra="", **kwargs):
|
||||
@@ -163,14 +163,15 @@ class TerminalReporter:
|
||||
def pytest__teardown_final_logerror(self, report):
|
||||
self.stats.setdefault("error", []).append(report)
|
||||
|
||||
def pytest_runtest_logstart(self, nodeid, location, fspath):
|
||||
def pytest_runtest_logstart(self, nodeid, location):
|
||||
# ensure that the path is printed before the
|
||||
# 1st test of a module starts running
|
||||
fspath = nodeid.split("::")[0]
|
||||
if self.showlongtestinfo:
|
||||
line = self._locationline(fspath, *location)
|
||||
self.write_ensure_prefix(line, "")
|
||||
elif self.showfspath:
|
||||
self.write_fspath_result(py.path.local(fspath), "")
|
||||
self.write_fspath_result(fspath, "")
|
||||
|
||||
def pytest_runtest_logreport(self, report):
|
||||
rep = report
|
||||
|
||||
Reference in New Issue
Block a user