vastly simplify and cleanup collection initialization by internally

introducing a RootCollector. Note that the internal node
methods _fromtrail and _totrail are shifted to the still internal
config._rootcol.fromtrail/totrail

--HG--
branch : trunk
This commit is contained in:
holger krekel
2010-01-03 01:02:44 +01:00
parent eebeb1b257
commit 1b34492108
9 changed files with 127 additions and 165 deletions

View File

@@ -115,7 +115,7 @@ class Node(object):
l = [self]
while 1:
x = l[-1]
if x.parent is not None:
if x.parent is not None and x.parent.parent is not None:
l.append(x.parent)
else:
if not rootfirst:
@@ -130,62 +130,7 @@ class Node(object):
while current and not isinstance(current, cls):
current = current.parent
return current
def _getitembynames(self, namelist):
cur = self
for name in namelist:
if name:
next = cur.collect_by_name(name)
if next is None:
existingnames = [x.name for x in self._memocollect()]
msg = ("Collector %r does not have name %r "
"existing names are: %s" %
(cur, name, existingnames))
raise AssertionError(msg)
cur = next
return cur
def _getfsnode(self, path):
# this method is usually called from
# config.getfsnode() which returns a colitem
# from filename arguments
#
# pytest's collector tree does not neccessarily
# follow the filesystem and we thus need to do
# some special matching code here because
# _getitembynames() works by colitem names, not
# basenames.
if path == self.fspath:
return self
basenames = path.relto(self.fspath).split(path.sep)
cur = self
while basenames:
basename = basenames.pop(0)
assert basename
fspath = cur.fspath.join(basename)
colitems = cur._memocollect()
l = []
for colitem in colitems:
if colitem.fspath == fspath or colitem.name == basename:
l.append(colitem)
if not l:
raise self.config.Error("can't collect: %s" %(fspath,))
if basenames:
if len(l) > 1:
msg = ("Collector %r has more than one %r colitem "
"existing colitems are: %s" %
(cur, fspath, colitems))
raise self.config.Error("xxx-too many test types for: %s" % (fspath, ))
cur = l[0]
else:
if len(l) > 1:
cur = l
else:
cur = l[0]
break
return cur
def readkeywords(self):
return dict([(x, True) for x in self._keywords()])
@@ -228,30 +173,6 @@ class Node(object):
def _prunetraceback(self, traceback):
return traceback
def _totrail(self):
""" provide a trail relative to the topdir,
which can be used to reconstruct the
collector (possibly on a different host
starting from a different topdir).
"""
chain = self.listchain()
topdir = self.config.topdir
relpath = chain[0].fspath.relto(topdir)
if not relpath:
if chain[0].fspath == topdir:
relpath = "."
else:
raise ValueError("%r not relative to topdir %s"
%(chain[0].fspath, topdir))
return relpath, tuple([x.name for x in chain[1:]])
def _fromtrail(trail, config):
relpath, names = trail
fspath = config.topdir.join(relpath)
col = config.getfsnode(fspath)
return col._getitembynames(names)
_fromtrail = staticmethod(_fromtrail)
def _repr_failure_py(self, excinfo):
excinfo.traceback = self._prunetraceback(excinfo.traceback)
# XXX should excinfo.getrepr record all data and toterminal()
@@ -347,30 +268,15 @@ class FSCollector(Collector):
self.fspath = fspath
def __getstate__(self):
if self.parent is None:
# the root node needs to pickle more context info
topdir = self.config.topdir
relpath = self.fspath.relto(topdir)
if not relpath:
if self.fspath == topdir:
relpath = "."
else:
raise ValueError("%r not relative to topdir %s"
%(self.fspath, topdir))
return (self.name, self.config, relpath)
if isinstance(self.parent, RootCollector):
relpath = self.parent._getrelpath(self.fspath)
return (relpath, self.parent)
else:
return (self.name, self.parent)
def __setstate__(self, picklestate):
if len(picklestate) == 3:
# root node
name, config, relpath = picklestate
fspath = config.topdir.join(relpath)
fsnode = config.getfsnode(fspath)
self.__dict__.update(fsnode.__dict__)
else:
name, parent = picklestate
self.__init__(parent.fspath.join(name), parent=parent)
name, parent = picklestate
self.__init__(parent.fspath.join(name), parent=parent)
class File(FSCollector):
""" base class for collecting tests from a file. """
@@ -421,7 +327,8 @@ class Directory(FSCollector):
l = []
for x in res:
if x not in l:
assert x.parent == self, "wrong collection tree construction"
assert x.parent == self, (x.parent, self)
assert x.fspath == path, (x.fspath, path)
l.append(x)
res = l
return res
@@ -468,3 +375,67 @@ def warnoldtestrun(function=None):
"implement item.runtest() instead of "
"item.run() and item.execute()",
stacklevel=2, function=function)
class RootCollector(Directory):
def __init__(self, config):
Directory.__init__(self, config.topdir, parent=None, config=config)
self.name = None
def getfsnode(self, path):
path = py.path.local(path)
if not path.check():
raise self.config.Error("file not found: %s" %(path,))
topdir = self.config.topdir
if path != topdir and not path.relto(topdir):
raise self.config.Error("path %r is not relative to %r" %
(str(path), str(self.fspath)))
# assumtion: pytest's fs-collector tree follows the filesystem tree
basenames = filter(None, path.relto(topdir).split(path.sep))
try:
return self.getbynames(basenames)
except ValueError:
raise self.config.Error("can't collect: %s" % str(path))
def getbynames(self, names):
current = self.consider(self.config.topdir)
for name in names:
if name == ".": # special "identity" name
continue
l = []
for x in current._memocollect():
if x.name == name:
l.append(x)
elif x.fspath == current.fspath.join(name):
l.append(x)
if not l:
raise ValueError("no node named %r in %r" %(name, current))
current = l[0]
return current
def totrail(self, node):
chain = node.listchain()
names = [self._getrelpath(chain[0].fspath)]
names += [x.name for x in chain[1:]]
return names
def fromtrail(self, trail):
return self.config._rootcol.getbynames(trail)
def _getrelpath(self, fspath):
topdir = self.config.topdir
relpath = fspath.relto(topdir)
if not relpath:
if fspath == topdir:
relpath = "."
else:
raise ValueError("%r not relative to topdir %s"
%(self.fspath, topdir))
return relpath
def __getstate__(self):
return self.config
def __setstate__(self, config):
self.__init__(config)

View File

@@ -2,6 +2,7 @@ import py, os
from py.impl.test.conftesthandle import Conftest
from py.impl.test.pluginmanager import PluginManager
from py.impl.test import parseopt
from py.impl.test.collect import RootCollector
def ensuretemp(string, dir=1):
""" (deprecated) return temporary directory path with
@@ -97,6 +98,7 @@ class Config(object):
if not args:
args.append(py.std.os.getcwd())
self.topdir = gettopdir(args)
self._rootcol = RootCollector(config=self)
self.args = [py.path.local(x) for x in args]
# config objects are usually pickled across system
@@ -117,6 +119,7 @@ class Config(object):
py.test.config = self
# next line will registers default plugins
self.__init__(topdir=py.path.local())
self._rootcol = RootCollector(config=self)
args, cmdlineopts = repr
args = [self.topdir.join(x) for x in args]
self.option = cmdlineopts
@@ -150,17 +153,7 @@ class Config(object):
return [self.getfsnode(arg) for arg in self.args]
def getfsnode(self, path):
path = py.path.local(path)
if not path.check():
raise self.Error("file not found: %s" %(path,))
# we want our possibly custom collection tree to start at pkgroot
pkgpath = path.pypkgpath()
if pkgpath is None:
pkgpath = path.check(file=1) and path.dirpath() or path
tmpcol = py.test.collect.Directory(pkgpath, config=self)
col = tmpcol.ihook.pytest_collect_directory(path=pkgpath, parent=tmpcol)
col.parent = None
return col._getfsnode(path)
return self._rootcol.getfsnode(path)
def _getcollectclass(self, name, path):
try:

View File

@@ -136,8 +136,8 @@ def slave_runsession(channel, config, fullwidth, hasmarkup):
colitems = []
for trail in trails:
try:
colitem = py.test.collect.Collector._fromtrail(trail, config)
except AssertionError:
colitem = config._rootcol.fromtrail(trail)
except ValueError:
#XXX send info for "test disappeared" or so
continue
colitems.append(colitem)
@@ -159,4 +159,5 @@ def slave_runsession(channel, config, fullwidth, hasmarkup):
session.config.hook.pytest_looponfailinfo(
failreports=list(failreports),
rootdirs=[config.topdir])
channel.send([rep.getnode()._totrail() for rep in failreports])
rootcol = session.config._rootcol
channel.send([rootcol.totrail(rep.getnode()) for rep in failreports])