make py lib a self-contained directory again
- move and merge _py/ bits back to py/ - fixes all around --HG-- branch : trunk
This commit is contained in:
300
py/impl/test/pluginmanager.py
Normal file
300
py/impl/test/pluginmanager.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""
|
||||
managing loading and interacting with pytest plugins.
|
||||
"""
|
||||
import py
|
||||
from py.plugin import hookspec
|
||||
from py.impl.test.outcome import Skipped
|
||||
|
||||
def check_old_use(mod, modname):
|
||||
clsname = modname[len('pytest_'):].capitalize() + "Plugin"
|
||||
assert not hasattr(mod, clsname), (mod, clsname)
|
||||
|
||||
class PluginManager(object):
|
||||
class Error(Exception):
|
||||
"""signals a plugin specific error."""
|
||||
def __init__(self, comregistry=None):
|
||||
if comregistry is None:
|
||||
comregistry = py._com.Registry()
|
||||
self.comregistry = comregistry
|
||||
self._name2plugin = {}
|
||||
|
||||
self.hook = py._com.HookRelay(
|
||||
hookspecs=hookspec,
|
||||
registry=self.comregistry)
|
||||
|
||||
def _getpluginname(self, plugin, name):
|
||||
if name is None:
|
||||
if hasattr(plugin, '__name__'):
|
||||
name = plugin.__name__.split(".")[-1]
|
||||
else:
|
||||
name = id(plugin)
|
||||
return name
|
||||
|
||||
def register(self, plugin, name=None):
|
||||
assert not self.isregistered(plugin)
|
||||
name = self._getpluginname(plugin, name)
|
||||
if name in self._name2plugin:
|
||||
return False
|
||||
self._name2plugin[name] = plugin
|
||||
self.hook.pytest_plugin_registered(plugin=plugin)
|
||||
self._checkplugin(plugin)
|
||||
self.comregistry.register(plugin)
|
||||
return True
|
||||
|
||||
def unregister(self, plugin):
|
||||
self.hook.pytest_plugin_unregistered(plugin=plugin)
|
||||
self.comregistry.unregister(plugin)
|
||||
for name, value in list(self._name2plugin.items()):
|
||||
if value == plugin:
|
||||
del self._name2plugin[name]
|
||||
|
||||
def isregistered(self, plugin, name=None):
|
||||
if self._getpluginname(plugin, name) in self._name2plugin:
|
||||
return True
|
||||
for val in self._name2plugin.values():
|
||||
if plugin == val:
|
||||
return True
|
||||
|
||||
def getplugins(self):
|
||||
return list(self.comregistry)
|
||||
|
||||
def getplugin(self, name):
|
||||
try:
|
||||
return self._name2plugin[name]
|
||||
except KeyError:
|
||||
impname = canonical_importname(name)
|
||||
return self._name2plugin[impname]
|
||||
|
||||
# API for bootstrapping
|
||||
#
|
||||
def _envlist(self, varname):
|
||||
val = py.std.os.environ.get(varname, None)
|
||||
if val is not None:
|
||||
return val.split(',')
|
||||
return ()
|
||||
|
||||
def consider_env(self):
|
||||
for spec in self._envlist("PYTEST_PLUGINS"):
|
||||
self.import_plugin(spec)
|
||||
|
||||
def consider_preparse(self, args):
|
||||
for opt1,opt2 in zip(args, args[1:]):
|
||||
if opt1 == "-p":
|
||||
self.import_plugin(opt2)
|
||||
|
||||
def consider_conftest(self, conftestmodule):
|
||||
cls = getattr(conftestmodule, 'ConftestPlugin', None)
|
||||
if cls is not None:
|
||||
raise ValueError("%r: 'ConftestPlugins' only existed till 1.0.0b1, "
|
||||
"were removed in 1.0.0b2" % (cls,))
|
||||
if self.register(conftestmodule, name=conftestmodule.__file__):
|
||||
self.consider_module(conftestmodule)
|
||||
|
||||
def consider_module(self, mod):
|
||||
attr = getattr(mod, "pytest_plugins", ())
|
||||
if attr:
|
||||
if not isinstance(attr, (list, tuple)):
|
||||
attr = (attr,)
|
||||
for spec in attr:
|
||||
self.import_plugin(spec)
|
||||
|
||||
def import_plugin(self, spec):
|
||||
assert isinstance(spec, str)
|
||||
modname = canonical_importname(spec)
|
||||
if modname in self._name2plugin:
|
||||
return
|
||||
try:
|
||||
mod = importplugin(modname)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except Skipped:
|
||||
e = py.std.sys.exc_info()[1]
|
||||
self._warn("could not import plugin %r, reason: %r" %(
|
||||
(modname, e.msg)))
|
||||
else:
|
||||
check_old_use(mod, modname)
|
||||
self.register(mod)
|
||||
self.consider_module(mod)
|
||||
|
||||
def _warn(self, msg):
|
||||
print ("===WARNING=== %s" % (msg,))
|
||||
|
||||
def _checkplugin(self, plugin):
|
||||
# =====================================================
|
||||
# check plugin hooks
|
||||
# =====================================================
|
||||
methods = collectattr(plugin)
|
||||
hooks = collectattr(hookspec)
|
||||
stringio = py.io.TextIO()
|
||||
def Print(*args):
|
||||
if args:
|
||||
stringio.write(" ".join(map(str, args)))
|
||||
stringio.write("\n")
|
||||
|
||||
fail = False
|
||||
while methods:
|
||||
name, method = methods.popitem()
|
||||
#print "checking", name
|
||||
if isgenerichook(name):
|
||||
continue
|
||||
if name not in hooks:
|
||||
Print("found unknown hook:", name)
|
||||
fail = True
|
||||
else:
|
||||
method_args = getargs(method)
|
||||
if '__multicall__' in method_args:
|
||||
method_args.remove('__multicall__')
|
||||
hook = hooks[name]
|
||||
hookargs = getargs(hook)
|
||||
for arg in method_args:
|
||||
if arg not in hookargs:
|
||||
Print("argument %r not available" %(arg, ))
|
||||
Print("actual definition: %s" %(formatdef(method)))
|
||||
Print("available hook arguments: %s" %
|
||||
", ".join(hookargs))
|
||||
fail = True
|
||||
break
|
||||
#if not fail:
|
||||
# print "matching hook:", formatdef(method)
|
||||
if fail:
|
||||
name = getattr(plugin, '__name__', plugin)
|
||||
raise self.Error("%s:\n%s" %(name, stringio.getvalue()))
|
||||
#
|
||||
#
|
||||
# API for interacting with registered and instantiated plugin objects
|
||||
#
|
||||
#
|
||||
def listattr(self, attrname, plugins=None, extra=()):
|
||||
return self.comregistry.listattr(attrname, plugins=plugins, extra=extra)
|
||||
|
||||
def notify_exception(self, excinfo=None):
|
||||
if excinfo is None:
|
||||
excinfo = py.code.ExceptionInfo()
|
||||
excrepr = excinfo.getrepr(funcargs=True, showlocals=True)
|
||||
return self.hook.pytest_internalerror(excrepr=excrepr)
|
||||
|
||||
def do_addoption(self, parser):
|
||||
mname = "pytest_addoption"
|
||||
methods = self.comregistry.listattr(mname, reverse=True)
|
||||
mc = py._com.MultiCall(methods, {'parser': parser})
|
||||
mc.execute()
|
||||
|
||||
def pytest_plugin_registered(self, plugin):
|
||||
if hasattr(self, '_config'):
|
||||
self.call_plugin(plugin, "pytest_addoption",
|
||||
{'parser': self._config._parser})
|
||||
self.call_plugin(plugin, "pytest_configure",
|
||||
{'config': self._config})
|
||||
#dic = self.call_plugin(plugin, "pytest_namespace")
|
||||
#self._updateext(dic)
|
||||
|
||||
def call_plugin(self, plugin, methname, kwargs):
|
||||
return py._com.MultiCall(
|
||||
methods=self.listattr(methname, plugins=[plugin]),
|
||||
kwargs=kwargs, firstresult=True).execute()
|
||||
|
||||
def _updateext(self, dic):
|
||||
if dic:
|
||||
for name, value in dic.items():
|
||||
setattr(py.test, name, value)
|
||||
|
||||
def do_configure(self, config):
|
||||
assert not hasattr(self, '_config')
|
||||
config.pluginmanager.register(self)
|
||||
self._config = config
|
||||
config.hook.pytest_configure(config=self._config)
|
||||
for dic in config.hook.pytest_namespace() or []:
|
||||
self._updateext(dic)
|
||||
|
||||
def do_unconfigure(self, config):
|
||||
config = self._config
|
||||
del self._config
|
||||
config.hook.pytest_unconfigure(config=config)
|
||||
config.pluginmanager.unregister(self)
|
||||
|
||||
#
|
||||
# XXX old code to automatically load classes
|
||||
#
|
||||
def canonical_importname(name):
|
||||
name = name.lower()
|
||||
modprefix = "pytest_"
|
||||
if not name.startswith(modprefix):
|
||||
name = modprefix + name
|
||||
return name
|
||||
|
||||
def importplugin(importspec):
|
||||
try:
|
||||
return __import__(importspec)
|
||||
except ImportError:
|
||||
e = py.std.sys.exc_info()[1]
|
||||
if str(e).find(importspec) == -1:
|
||||
raise
|
||||
try:
|
||||
return __import__("py.plugin.%s" %(importspec),
|
||||
None, None, '__doc__')
|
||||
except ImportError:
|
||||
e = py.std.sys.exc_info()[1]
|
||||
if str(e).find(importspec) == -1:
|
||||
raise
|
||||
#print "syspath:", py.std.sys.path
|
||||
#print "curdir:", py.std.os.getcwd()
|
||||
return __import__(importspec) # show the original exception
|
||||
|
||||
|
||||
|
||||
def isgenerichook(name):
|
||||
return name == "pytest_plugins" or \
|
||||
name.startswith("pytest_funcarg__")
|
||||
|
||||
def getargs(func):
|
||||
args = py.std.inspect.getargs(py.code.getrawcode(func))[0]
|
||||
startindex = py.std.inspect.ismethod(func) and 1 or 0
|
||||
return args[startindex:]
|
||||
|
||||
def collectattr(obj, prefixes=("pytest_",)):
|
||||
methods = {}
|
||||
for apiname in dir(obj):
|
||||
for prefix in prefixes:
|
||||
if apiname.startswith(prefix):
|
||||
methods[apiname] = getattr(obj, apiname)
|
||||
return methods
|
||||
|
||||
def formatdef(func):
|
||||
return "%s%s" %(
|
||||
func.__name__,
|
||||
py.std.inspect.formatargspec(*py.std.inspect.getargspec(func))
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import py.plugin
|
||||
basedir = py._dir.join('_plugin')
|
||||
name2text = {}
|
||||
for p in basedir.listdir("pytest_*"):
|
||||
if p.ext == ".py" or (
|
||||
p.check(dir=1) and p.join("__init__.py").check()):
|
||||
impname = p.purebasename
|
||||
if impname.find("__") != -1:
|
||||
continue
|
||||
try:
|
||||
plugin = importplugin(impname)
|
||||
except (ImportError, py.impl.test.outcome.Skipped):
|
||||
name2text[impname] = "IMPORT ERROR"
|
||||
else:
|
||||
doc = plugin.__doc__ or ""
|
||||
doc = doc.strip()
|
||||
name2text[impname] = doc
|
||||
|
||||
for name in sorted(name2text.keys()):
|
||||
text = name2text[name]
|
||||
if name[0] == "_":
|
||||
continue
|
||||
print ("%-20s %s" % (name, text.split("\n")[0]))
|
||||
|
||||
#text = py.std.textwrap.wrap(name2text[name],
|
||||
# width = 80,
|
||||
# initial_indent="%s: " % name,
|
||||
# replace_whitespace = False)
|
||||
#for line in text:
|
||||
# print line
|
||||
|
||||
|
||||
Reference in New Issue
Block a user