reorder internal layout so that funcarg-related functionality is in python.py

This commit is contained in:
holger krekel
2012-08-01 09:23:39 +02:00
parent 4f94ab4e42
commit cb2eb9ba33
3 changed files with 361 additions and 387 deletions

View File

@@ -3,8 +3,10 @@ import py
import inspect
import sys
import pytest
from _pytest.main import getfslineno, getfuncargnames, readscope
from _pytest.main import getfslineno
from _pytest.monkeypatch import monkeypatch
from py._code.code import TerminalRepr
from _pytest.mark import MarkInfo
import _pytest
cutdir = py.path.local(_pytest.__file__).dirpath()
@@ -69,6 +71,8 @@ def pytest_configure(config):
"test function, one with arg1=1 and another with arg1=2."
)
def pytest_sessionstart(session):
session.funcargmanager = FuncargManager(session)
@pytest.mark.trylast
def pytest_namespace():
@@ -1123,3 +1127,355 @@ def slice_kwargs(names, kwargs):
new_kwargs[name] = kwargs[name]
return new_kwargs
class FuncargLookupError(LookupError):
""" could not find a factory. """
def __init__(self, function, msg):
self.function = function
self.msg = msg
class FuncargLookupErrorRepr(TerminalRepr):
def __init__(self, filename, firstlineno, deflines, errorstring):
self.deflines = deflines
self.errorstring = errorstring
self.filename = filename
self.firstlineno = firstlineno
def toterminal(self, tw):
tw.line()
for line in self.deflines:
tw.line(" " + line.strip())
for line in self.errorstring.split("\n"):
tw.line(" " + line.strip(), red=True)
tw.line()
tw.line("%s:%d" % (self.filename, self.firstlineno+1))
class FuncargManager:
_argprefix = "pytest_funcarg__"
FuncargLookupError = FuncargLookupError
FuncargLookupErrorRepr = FuncargLookupErrorRepr
def __init__(self, session):
self.session = session
self.config = session.config
self.arg2facspec = {}
session.config.pluginmanager.register(self, "funcmanage")
self._holderobjseen = set()
self.setuplist = []
self._arg2finish = {}
### XXX this hook should be called for historic events like pytest_configure
### so that we don't have to do the below pytest_collection hook
def pytest_plugin_registered(self, plugin):
#print "plugin_registered", plugin
nodeid = ""
try:
p = py.path.local(plugin.__file__)
except AttributeError:
pass
else:
if p.basename.startswith("conftest.py"):
nodeid = p.dirpath().relto(self.session.fspath)
self._parsefactories(plugin, nodeid)
@pytest.mark.tryfirst
def pytest_collection(self, session):
plugins = session.config.pluginmanager.getplugins()
for plugin in plugins:
self.pytest_plugin_registered(plugin)
def pytest_generate_tests(self, metafunc):
funcargnames = list(metafunc.funcargnames)
_, allargnames = self.getsetuplist(metafunc.parentid)
#print "setuplist, allargnames", setuplist, allargnames
funcargnames.extend(allargnames)
seen = set()
while funcargnames:
argname = funcargnames.pop(0)
if argname in seen or argname == "request":
continue
seen.add(argname)
faclist = self.getfactorylist(argname, metafunc.parentid,
metafunc.function, raising=False)
if faclist is None:
continue # will raise FuncargLookupError at setup time
for facdef in faclist:
if facdef.params is not None:
metafunc.parametrize(argname, facdef.params, indirect=True,
scope=facdef.scope)
funcargnames.extend(facdef.funcargnames)
def pytest_collection_modifyitems(self, items):
# separate parametrized setups
items[:] = parametrize_sorted(items, set(), {}, 0)
def pytest_runtest_teardown(self, item, nextitem):
try:
cs1 = item.callspec
except AttributeError:
return
for name in cs1.params:
try:
if name in nextitem.callspec.params and \
cs1.params[name] == nextitem.callspec.params[name]:
continue
except AttributeError:
pass
key = (name, cs1.params[name])
item.session._setupstate._callfinalizers(key)
l = self._arg2finish.get(name)
if l is not None:
for fin in l:
fin()
def _parsefactories(self, holderobj, nodeid):
if holderobj in self._holderobjseen:
return
#print "parsefactories", holderobj
self._holderobjseen.add(holderobj)
for name in dir(holderobj):
#print "check", holderobj, name
obj = getattr(holderobj, name)
if not callable(obj):
continue
# funcarg factories either have a pytest_funcarg__ prefix
# or are "funcarg" marked
if not callable(obj):
continue
marker = getattr(obj, "funcarg", None)
if marker is not None and isinstance(marker, MarkInfo):
assert not name.startswith(self._argprefix)
argname = name
scope = marker.kwargs.get("scope")
params = marker.kwargs.get("params")
elif name.startswith(self._argprefix):
argname = name[len(self._argprefix):]
scope = None
params = None
else:
# no funcargs. check if we have a setup function.
setup = getattr(obj, "setup", None)
if setup is not None and isinstance(setup, MarkInfo):
scope = setup.kwargs.get("scope")
sf = SetupCall(self, nodeid, obj, scope)
self.setuplist.append(sf)
continue
faclist = self.arg2facspec.setdefault(argname, [])
factorydef = FactoryDef(self, nodeid, argname, obj, scope, params)
faclist.append(factorydef)
### check scope/params mismatch?
def getsetuplist(self, nodeid):
l = []
allargnames = set()
for setupcall in self.setuplist:
if nodeid.startswith(setupcall.baseid):
l.append(setupcall)
allargnames.update(setupcall.funcargnames)
return l, allargnames
def getfactorylist(self, argname, nodeid, function, raising=True):
try:
factorydeflist = self.arg2facspec[argname]
except KeyError:
if raising:
self._raiselookupfailed(argname, function, nodeid)
else:
return self._matchfactories(factorydeflist, nodeid)
def _matchfactories(self, factorydeflist, nodeid):
l = []
for factorydef in factorydeflist:
#print "check", basepath, nodeid
if nodeid.startswith(factorydef.baseid):
l.append(factorydef)
return l
def _raiselookupfailed(self, argname, function, nodeid):
available = []
for name, facdef in self.arg2facspec.items():
faclist = self._matchfactories(facdef, nodeid)
if faclist:
available.append(name)
msg = "LookupError: no factory found for argument %r" % (argname,)
msg += "\n available funcargs: %s" %(", ".join(available),)
msg += "\n use 'py.test --funcargs [testpath]' for help on them."
raise FuncargLookupError(function, msg)
def ensure_setupcalls(self, request):
setuplist, allnames = self.getsetuplist(request._pyfuncitem.nodeid)
for setupcall in setuplist:
if setupcall.active:
continue
setuprequest = SetupRequest(request, setupcall)
kwargs = {}
for name in setupcall.funcargnames:
if name == "request":
kwargs[name] = setuprequest
else:
kwargs[name] = request.getfuncargvalue(name)
scope = setupcall.scope or "function"
scol = setupcall.scopeitem = request._getscopeitem(scope)
self.session._setupstate.addfinalizer(setupcall.finish, scol)
for argname in setupcall.funcargnames: # XXX all deps?
self.addargfinalizer(setupcall.finish, argname)
setupcall.execute(kwargs)
def addargfinalizer(self, finalizer, argname):
l = self._arg2finish.setdefault(argname, [])
l.append(finalizer)
def removefinalizer(self, finalizer):
for l in self._arg2finish.values():
try:
l.remove(finalizer)
except ValueError:
pass
def rprop(attr, doc=None):
if doc is None:
doc = "%r of underlying test item"
return property(lambda x: getattr(x._request, attr), doc=doc)
class SetupRequest:
def __init__(self, request, setupcall):
self._request = request
self._setupcall = setupcall
self._finalizers = []
# no getfuncargvalue(), cached_setup, applymarker helpers here
# on purpose
function = rprop("function")
cls = rprop("cls")
instance = rprop("instance")
fspath = rprop("fspath")
keywords = rprop("keywords")
config = rprop("config", "pytest config object.")
def addfinalizer(self, finalizer):
self._setupcall.addfinalizer(finalizer)
class SetupCall:
""" a container/helper for managing calls to setup functions. """
def __init__(self, funcargmanager, baseid, func, scope):
self.funcargmanager = funcargmanager
self.baseid = baseid
self.func = func
self.funcargnames = getfuncargnames(func)
self.scope = scope
self.active = False
self._finalizer = []
def execute(self, kwargs):
assert not self.active
self.active = True
self.func(**kwargs)
def addfinalizer(self, finalizer):
assert self.active
self._finalizer.append(finalizer)
def finish(self):
while self._finalizer:
func = self._finalizer.pop()
func()
# check neccesity of next commented call
self.funcargmanager.removefinalizer(self.finish)
self.active = False
class FactoryDef:
""" A container for a factory definition. """
def __init__(self, funcargmanager, baseid, argname, func, scope, params):
self.funcargmanager = funcargmanager
self.baseid = baseid
self.func = func
self.argname = argname
self.scope = scope
self.params = params
self.funcargnames = getfuncargnames(func)
def getfuncargnames(function, startindex=None):
# XXX merge with main.py's varnames
argnames = py.std.inspect.getargs(py.code.getrawcode(function))[0]
if startindex is None:
startindex = py.std.inspect.ismethod(function) and 1 or 0
defaults = getattr(function, 'func_defaults',
getattr(function, '__defaults__', None)) or ()
numdefaults = len(defaults)
if numdefaults:
return argnames[startindex:-numdefaults]
return argnames[startindex:]
# algorithm for sorting on a per-parametrized resource setup basis
def parametrize_sorted(items, ignore, cache, scopenum):
if scopenum >= 3:
return items
newitems = []
olditems = []
slicing_argparam = None
for item in items:
argparamlist = getfuncargparams(item, ignore, scopenum, cache)
if slicing_argparam is None and argparamlist:
slicing_argparam = argparamlist[0]
slicing_index = len(olditems)
if slicing_argparam in argparamlist:
newitems.append(item)
else:
olditems.append(item)
if newitems:
newignore = ignore.copy()
newignore.add(slicing_argparam)
newitems = parametrize_sorted(newitems + olditems[slicing_index:],
newignore, cache, scopenum)
old1 = parametrize_sorted(olditems[:slicing_index], newignore,
cache, scopenum+1)
return old1 + newitems
else:
olditems = parametrize_sorted(olditems, ignore, cache, scopenum+1)
return olditems + newitems
def getfuncargparams(item, ignore, scopenum, cache):
""" return list of (arg,param) tuple, sorted by broader scope first. """
assert scopenum < 3 # function
try:
cs = item.callspec
except AttributeError:
return []
if scopenum == 0:
argparams = [x for x in cs.params.items() if x not in ignore
and cs._arg2scopenum[x[0]] == scopenum]
elif scopenum == 1: # module
argparams = []
for argname, param in cs.params.items():
if cs._arg2scopenum[argname] == scopenum:
key = (argname, param, item.fspath)
if key in ignore:
continue
argparams.append(key)
elif scopenum == 2: # class
argparams = []
for argname, param in cs.params.items():
if cs._arg2scopenum[argname] == scopenum:
l = cache.setdefault(item.fspath, [])
try:
i = l.index(item.cls)
except ValueError:
i = len(l)
l.append(item.cls)
key = (argname, param, item.fspath, i)
if key in ignore:
continue
argparams.append(key)
#elif scopenum == 3:
# argparams = []
# for argname, param in cs.params.items():
# if cs._arg2scopenum[argname] == scopenum:
# key = (argname, param, getfslineno(item.obj))
# if key in ignore:
# continue
# argparams.append(key)
return argparams