202 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			202 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
import py
 | 
						|
 | 
						|
def getfuncargnames(function):
 | 
						|
    argnames = py.std.inspect.getargs(py.code.getrawcode(function))[0]
 | 
						|
    startindex = py.std.inspect.ismethod(function) and 1 or 0
 | 
						|
    defaults = getattr(function, 'func_defaults', 
 | 
						|
                       getattr(function, '__defaults__', None)) or ()
 | 
						|
    numdefaults = len(defaults)
 | 
						|
    if numdefaults:
 | 
						|
        return argnames[startindex:-numdefaults]
 | 
						|
    return argnames[startindex:]
 | 
						|
    
 | 
						|
def fillfuncargs(function):
 | 
						|
    """ fill missing funcargs. """ 
 | 
						|
    request = FuncargRequest(pyfuncitem=function)
 | 
						|
    request._fillfuncargs()
 | 
						|
 | 
						|
def getplugins(node, withpy=False): # might by any node
 | 
						|
    plugins = node.config._getmatchingplugins(node.fspath)
 | 
						|
    if withpy:
 | 
						|
        mod = node.getparent(py.test.collect.Module)
 | 
						|
        if mod is not None:
 | 
						|
            plugins.append(mod.obj)
 | 
						|
        inst = node.getparent(py.test.collect.Instance)
 | 
						|
        if inst is not None:
 | 
						|
            plugins.append(inst.obj)
 | 
						|
    return plugins
 | 
						|
 | 
						|
_notexists = object()
 | 
						|
class CallSpec:
 | 
						|
    def __init__(self, funcargs, id, param):
 | 
						|
        self.funcargs = funcargs 
 | 
						|
        self.id = id
 | 
						|
        if param is not _notexists:
 | 
						|
            self.param = param 
 | 
						|
    def __repr__(self):
 | 
						|
        return "<CallSpec id=%r param=%r funcargs=%r>" %(
 | 
						|
            self.id, getattr(self, 'param', '?'), self.funcargs)
 | 
						|
 | 
						|
class Metafunc:
 | 
						|
    def __init__(self, function, config=None, cls=None, module=None):
 | 
						|
        self.config = config
 | 
						|
        self.module = module 
 | 
						|
        self.function = function
 | 
						|
        self.funcargnames = getfuncargnames(function)
 | 
						|
        self.cls = cls
 | 
						|
        self.module = module
 | 
						|
        self._calls = []
 | 
						|
        self._ids = py.builtin.set()
 | 
						|
 | 
						|
    def addcall(self, funcargs=None, id=_notexists, param=_notexists):
 | 
						|
        assert funcargs is None or isinstance(funcargs, dict)
 | 
						|
        if id is None:
 | 
						|
            raise ValueError("id=None not allowed") 
 | 
						|
        if id is _notexists:
 | 
						|
            id = len(self._calls)
 | 
						|
        id = str(id)
 | 
						|
        if id in self._ids:
 | 
						|
            raise ValueError("duplicate id %r" % id)
 | 
						|
        self._ids.add(id)
 | 
						|
        self._calls.append(CallSpec(funcargs, id, param))
 | 
						|
 | 
						|
class FunctionCollector(py.test.collect.Collector):
 | 
						|
    def __init__(self, name, parent, calls):
 | 
						|
        super(FunctionCollector, self).__init__(name, parent)
 | 
						|
        self.calls = calls 
 | 
						|
        self.obj = getattr(self.parent.obj, name) 
 | 
						|
       
 | 
						|
    def collect(self):
 | 
						|
        l = []
 | 
						|
        for callspec in self.calls:
 | 
						|
            name = "%s[%s]" %(self.name, callspec.id)
 | 
						|
            function = self.parent.Function(name=name, parent=self, 
 | 
						|
                callspec=callspec, callobj=self.obj)
 | 
						|
            l.append(function)
 | 
						|
        return l
 | 
						|
 | 
						|
    def reportinfo(self):
 | 
						|
        try:
 | 
						|
            return self._fslineno, self.name
 | 
						|
        except AttributeError:
 | 
						|
            pass        
 | 
						|
        fspath, lineno = py.code.getfslineno(self.obj)
 | 
						|
        self._fslineno = fspath, lineno
 | 
						|
        return fspath, lineno, self.name
 | 
						|
    
 | 
						|
 | 
						|
class FuncargRequest:
 | 
						|
    _argprefix = "pytest_funcarg__"
 | 
						|
    _argname = None
 | 
						|
 | 
						|
    class Error(LookupError):
 | 
						|
        """ error on performing funcarg request. """ 
 | 
						|
 | 
						|
    def __init__(self, pyfuncitem):
 | 
						|
        self._pyfuncitem = pyfuncitem
 | 
						|
        self.function = pyfuncitem.obj
 | 
						|
        self.module = pyfuncitem.getparent(py.test.collect.Module).obj
 | 
						|
        clscol = pyfuncitem.getparent(py.test.collect.Class)
 | 
						|
        self.cls = clscol and clscol.obj or None
 | 
						|
        self.instance = py.builtin._getimself(self.function)
 | 
						|
        self.config = pyfuncitem.config
 | 
						|
        self.fspath = pyfuncitem.fspath
 | 
						|
        if hasattr(pyfuncitem, '_requestparam'):
 | 
						|
            self.param = pyfuncitem._requestparam 
 | 
						|
        self._plugins = getplugins(pyfuncitem, withpy=True)
 | 
						|
        self._funcargs  = self._pyfuncitem.funcargs.copy()
 | 
						|
        self._name2factory = {}
 | 
						|
        self._currentarg = None
 | 
						|
 | 
						|
    def _fillfuncargs(self):
 | 
						|
        argnames = getfuncargnames(self.function)
 | 
						|
        if argnames:
 | 
						|
            assert not getattr(self._pyfuncitem, '_args', None), (
 | 
						|
                "yielded functions cannot have funcargs")
 | 
						|
        for argname in argnames:
 | 
						|
            if argname not in self._pyfuncitem.funcargs:
 | 
						|
                self._pyfuncitem.funcargs[argname] = self.getfuncargvalue(argname)
 | 
						|
 | 
						|
    def cached_setup(self, setup, teardown=None, scope="module", extrakey=None):
 | 
						|
        """ cache and return result of calling setup().  
 | 
						|
 | 
						|
        The requested argument name, the scope and the ``extrakey`` 
 | 
						|
        determine the cache key.  The scope also determines when 
 | 
						|
        teardown(result) will be called.  valid scopes are: 
 | 
						|
        scope == 'function': when the single test function run finishes. 
 | 
						|
        scope == 'module': when tests in a different module are run
 | 
						|
        scope == 'session': when tests of the session have run. 
 | 
						|
        """
 | 
						|
        if not hasattr(self.config, '_setupcache'):
 | 
						|
            self.config._setupcache = {} # XXX weakref? 
 | 
						|
        cachekey = (self._currentarg, self._getscopeitem(scope), extrakey)
 | 
						|
        cache = self.config._setupcache
 | 
						|
        try:
 | 
						|
            val = cache[cachekey]
 | 
						|
        except KeyError:
 | 
						|
            val = setup()
 | 
						|
            cache[cachekey] = val 
 | 
						|
            if teardown is not None:
 | 
						|
                def finalizer():
 | 
						|
                    del cache[cachekey]
 | 
						|
                    teardown(val)
 | 
						|
                self._addfinalizer(finalizer, scope=scope)
 | 
						|
        return val 
 | 
						|
 | 
						|
    def getfuncargvalue(self, argname):
 | 
						|
        try:
 | 
						|
            return self._funcargs[argname]
 | 
						|
        except KeyError:
 | 
						|
            pass
 | 
						|
        if argname not in self._name2factory:
 | 
						|
            self._name2factory[argname] = self.config.pluginmanager.listattr(
 | 
						|
                    plugins=self._plugins, 
 | 
						|
                    attrname=self._argprefix + str(argname)
 | 
						|
            )
 | 
						|
        #else: we are called recursively  
 | 
						|
        if not self._name2factory[argname]:
 | 
						|
            self._raiselookupfailed(argname)
 | 
						|
        funcargfactory = self._name2factory[argname].pop()
 | 
						|
        oldarg = self._currentarg
 | 
						|
        self._currentarg = argname 
 | 
						|
        try:
 | 
						|
            self._funcargs[argname] = res = funcargfactory(request=self)
 | 
						|
        finally:
 | 
						|
            self._currentarg = oldarg
 | 
						|
        return res
 | 
						|
 | 
						|
    def _getscopeitem(self, scope):
 | 
						|
        if scope == "function":
 | 
						|
            return self._pyfuncitem
 | 
						|
        elif scope == "module":
 | 
						|
            return self._pyfuncitem.getparent(py.test.collect.Module)
 | 
						|
        elif scope == "session":
 | 
						|
            return None
 | 
						|
        raise ValueError("unknown finalization scope %r" %(scope,))
 | 
						|
 | 
						|
    def _addfinalizer(self, finalizer, scope):
 | 
						|
        colitem = self._getscopeitem(scope)
 | 
						|
        self.config._setupstate.addfinalizer(
 | 
						|
            finalizer=finalizer, colitem=colitem)
 | 
						|
 | 
						|
    def addfinalizer(self, finalizer):
 | 
						|
        """ call the given finalizer after test function finished execution. """ 
 | 
						|
        self._addfinalizer(finalizer, scope="function") 
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return "<FuncargRequest for %r>" %(self._pyfuncitem)
 | 
						|
 | 
						|
    def _raiselookupfailed(self, argname):
 | 
						|
        available = []
 | 
						|
        for plugin in self._plugins:
 | 
						|
            for name in vars(plugin):
 | 
						|
                if name.startswith(self._argprefix):
 | 
						|
                    name = name[len(self._argprefix):]
 | 
						|
                    if name not in available:
 | 
						|
                        available.append(name) 
 | 
						|
        fspath, lineno, msg = self._pyfuncitem.reportinfo()
 | 
						|
        line = "%s:%s" %(fspath, lineno)
 | 
						|
        msg = "funcargument %r not found for: %s" %(argname, line)
 | 
						|
        msg += "\n available funcargs: %s" %(", ".join(available),)
 | 
						|
        raise self.Error(msg)
 |