771 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			771 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
| """
 | |
| PluginManager, basic initialization and tracing.
 | |
| 
 | |
| pluggy is the cristallized core of plugin management as used
 | |
| by some 150 plugins for pytest.
 | |
| 
 | |
| Pluggy uses semantic versioning. Breaking changes are only foreseen for
 | |
| Major releases (incremented X in "X.Y.Z").  If you want to use pluggy in
 | |
| your project you should thus use a dependency restriction like
 | |
| "pluggy>=0.1.0,<1.0" to avoid surprises.
 | |
| 
 | |
| pluggy is concerned with hook specification, hook implementations and hook
 | |
| calling.  For any given hook specification a hook call invokes up to N implementations.
 | |
| A hook implementation can influence its position and type of execution:
 | |
| if attributed "tryfirst" or "trylast" it will be tried to execute
 | |
| first or last.  However, if attributed "hookwrapper" an implementation
 | |
| can wrap all calls to non-hookwrapper implementations.  A hookwrapper
 | |
| can thus execute some code ahead and after the execution of other hooks.
 | |
| 
 | |
| Hook specification is done by way of a regular python function where
 | |
| both the function name and the names of all its arguments are significant.
 | |
| Each hook implementation function is verified against the original specification
 | |
| function, including the names of all its arguments.  To allow for hook specifications
 | |
| to evolve over the livetime of a project, hook implementations can
 | |
| accept less arguments.  One can thus add new arguments and semantics to
 | |
| a hook specification by adding another argument typically without breaking
 | |
| existing hook implementations.
 | |
| 
 | |
| The chosen approach is meant to let a hook designer think carefuly about
 | |
| which objects are needed by an extension writer.  By contrast, subclass-based
 | |
| extension mechanisms often expose a lot more state and behaviour than needed,
 | |
| thus restricting future developments.
 | |
| 
 | |
| Pluggy currently consists of functionality for:
 | |
| 
 | |
| - a way to register new hook specifications.  Without a hook
 | |
|   specification no hook calling can be performed.
 | |
| 
 | |
| - a registry of plugins which contain hook implementation functions.  It
 | |
|   is possible to register plugins for which a hook specification is not yet
 | |
|   known and validate all hooks when the system is in a more referentially
 | |
|   consistent state.  Setting an "optionalhook" attribution to a hook
 | |
|   implementation will avoid PluginValidationError's if a specifcation
 | |
|   is missing.  This allows to have optional integration between plugins.
 | |
| 
 | |
| - a "hook" relay object from which you can launch 1:N calls to
 | |
|   registered hook implementation functions
 | |
| 
 | |
| - a mechanism for ordering hook implementation functions
 | |
| 
 | |
| - mechanisms for two different type of 1:N calls: "firstresult" for when
 | |
|   the call should stop when the first implementation returns a non-None result.
 | |
|   And the other (default) way of guaranteeing that all hook implementations
 | |
|   will be called and their non-None result collected.
 | |
| 
 | |
| - mechanisms for "historic" extension points such that all newly
 | |
|   registered functions will receive all hook calls that happened
 | |
|   before their registration.
 | |
| 
 | |
| - a mechanism for discovering plugin objects which are based on
 | |
|   setuptools based entry points.
 | |
| 
 | |
| - a simple tracing mechanism, including tracing of plugin calls and
 | |
|   their arguments.
 | |
| 
 | |
| """
 | |
| import sys
 | |
| import inspect
 | |
| 
 | |
| __version__ = '0.3.0'
 | |
| __all__ = ["PluginManager", "PluginValidationError",
 | |
|            "HookspecMarker", "HookimplMarker"]
 | |
| 
 | |
| _py3 = sys.version_info > (3, 0)
 | |
| 
 | |
| 
 | |
| class HookspecMarker:
 | |
|     """ Decorator helper class for marking functions as hook specifications.
 | |
| 
 | |
|     You can instantiate it with a project_name to get a decorator.
 | |
|     Calling PluginManager.add_hookspecs later will discover all marked functions
 | |
|     if the PluginManager uses the same project_name.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, project_name):
 | |
|         self.project_name = project_name
 | |
| 
 | |
|     def __call__(self, function=None, firstresult=False, historic=False):
 | |
|         """ if passed a function, directly sets attributes on the function
 | |
|         which will make it discoverable to add_hookspecs().  If passed no
 | |
|         function, returns a decorator which can be applied to a function
 | |
|         later using the attributes supplied.
 | |
| 
 | |
|         If firstresult is True the 1:N hook call (N being the number of registered
 | |
|         hook implementation functions) will stop at I<=N when the I'th function
 | |
|         returns a non-None result.
 | |
| 
 | |
|         If historic is True calls to a hook will be memorized and replayed
 | |
|         on later registered plugins.
 | |
| 
 | |
|         """
 | |
|         def setattr_hookspec_opts(func):
 | |
|             if historic and firstresult:
 | |
|                 raise ValueError("cannot have a historic firstresult hook")
 | |
|             setattr(func, self.project_name + "_spec",
 | |
|                    dict(firstresult=firstresult, historic=historic))
 | |
|             return func
 | |
| 
 | |
|         if function is not None:
 | |
|             return setattr_hookspec_opts(function)
 | |
|         else:
 | |
|             return setattr_hookspec_opts
 | |
| 
 | |
| 
 | |
| class HookimplMarker:
 | |
|     """ Decorator helper class for marking functions as hook implementations.
 | |
| 
 | |
|     You can instantiate with a project_name to get a decorator.
 | |
|     Calling PluginManager.register later will discover all marked functions
 | |
|     if the PluginManager uses the same project_name.
 | |
|     """
 | |
|     def __init__(self, project_name):
 | |
|         self.project_name = project_name
 | |
| 
 | |
|     def __call__(self, function=None, hookwrapper=False, optionalhook=False,
 | |
|                  tryfirst=False, trylast=False):
 | |
| 
 | |
|         """ if passed a function, directly sets attributes on the function
 | |
|         which will make it discoverable to register().  If passed no function,
 | |
|         returns a decorator which can be applied to a function later using
 | |
|         the attributes supplied.
 | |
| 
 | |
|         If optionalhook is True a missing matching hook specification will not result
 | |
|         in an error (by default it is an error if no matching spec is found).
 | |
| 
 | |
|         If tryfirst is True this hook implementation will run as early as possible
 | |
|         in the chain of N hook implementations for a specfication.
 | |
| 
 | |
|         If trylast is True this hook implementation will run as late as possible
 | |
|         in the chain of N hook implementations.
 | |
| 
 | |
|         If hookwrapper is True the hook implementations needs to execute exactly
 | |
|         one "yield".  The code before the yield is run early before any non-hookwrapper
 | |
|         function is run.  The code after the yield is run after all non-hookwrapper
 | |
|         function have run.  The yield receives an ``_CallOutcome`` object representing
 | |
|         the exception or result outcome of the inner calls (including other hookwrapper
 | |
|         calls).
 | |
| 
 | |
|         """
 | |
|         def setattr_hookimpl_opts(func):
 | |
|             setattr(func, self.project_name + "_impl",
 | |
|                    dict(hookwrapper=hookwrapper, optionalhook=optionalhook,
 | |
|                         tryfirst=tryfirst, trylast=trylast))
 | |
|             return func
 | |
| 
 | |
|         if function is None:
 | |
|             return setattr_hookimpl_opts
 | |
|         else:
 | |
|             return setattr_hookimpl_opts(function)
 | |
| 
 | |
| 
 | |
| def normalize_hookimpl_opts(opts):
 | |
|     opts.setdefault("tryfirst", False)
 | |
|     opts.setdefault("trylast", False)
 | |
|     opts.setdefault("hookwrapper", False)
 | |
|     opts.setdefault("optionalhook", False)
 | |
| 
 | |
| 
 | |
| class _TagTracer:
 | |
|     def __init__(self):
 | |
|         self._tag2proc = {}
 | |
|         self.writer = None
 | |
|         self.indent = 0
 | |
| 
 | |
|     def get(self, name):
 | |
|         return _TagTracerSub(self, (name,))
 | |
| 
 | |
|     def format_message(self, tags, args):
 | |
|         if isinstance(args[-1], dict):
 | |
|             extra = args[-1]
 | |
|             args = args[:-1]
 | |
|         else:
 | |
|             extra = {}
 | |
| 
 | |
|         content = " ".join(map(str, args))
 | |
|         indent = "  " * self.indent
 | |
| 
 | |
|         lines = [
 | |
|             "%s%s [%s]\n" % (indent, content, ":".join(tags))
 | |
|         ]
 | |
| 
 | |
|         for name, value in extra.items():
 | |
|             lines.append("%s    %s: %s\n" % (indent, name, value))
 | |
|         return lines
 | |
| 
 | |
|     def processmessage(self, tags, args):
 | |
|         if self.writer is not None and args:
 | |
|             lines = self.format_message(tags, args)
 | |
|             self.writer(''.join(lines))
 | |
|         try:
 | |
|             self._tag2proc[tags](tags, args)
 | |
|         except KeyError:
 | |
|             pass
 | |
| 
 | |
|     def setwriter(self, writer):
 | |
|         self.writer = writer
 | |
| 
 | |
|     def setprocessor(self, tags, processor):
 | |
|         if isinstance(tags, str):
 | |
|             tags = tuple(tags.split(":"))
 | |
|         else:
 | |
|             assert isinstance(tags, tuple)
 | |
|         self._tag2proc[tags] = processor
 | |
| 
 | |
| 
 | |
| class _TagTracerSub:
 | |
|     def __init__(self, root, tags):
 | |
|         self.root = root
 | |
|         self.tags = tags
 | |
| 
 | |
|     def __call__(self, *args):
 | |
|         self.root.processmessage(self.tags, args)
 | |
| 
 | |
|     def setmyprocessor(self, processor):
 | |
|         self.root.setprocessor(self.tags, processor)
 | |
| 
 | |
|     def get(self, name):
 | |
|         return self.__class__(self.root, self.tags + (name,))
 | |
| 
 | |
| 
 | |
| def _raise_wrapfail(wrap_controller, msg):
 | |
|     co = wrap_controller.gi_code
 | |
|     raise RuntimeError("wrap_controller at %r %s:%d %s" %
 | |
|                    (co.co_name, co.co_filename, co.co_firstlineno, msg))
 | |
| 
 | |
| 
 | |
| def _wrapped_call(wrap_controller, func):
 | |
|     """ Wrap calling to a function with a generator which needs to yield
 | |
|     exactly once.  The yield point will trigger calling the wrapped function
 | |
|     and return its _CallOutcome to the yield point.  The generator then needs
 | |
|     to finish (raise StopIteration) in order for the wrapped call to complete.
 | |
|     """
 | |
|     try:
 | |
|         next(wrap_controller)   # first yield
 | |
|     except StopIteration:
 | |
|         _raise_wrapfail(wrap_controller, "did not yield")
 | |
|     call_outcome = _CallOutcome(func)
 | |
|     try:
 | |
|         wrap_controller.send(call_outcome)
 | |
|         _raise_wrapfail(wrap_controller, "has second yield")
 | |
|     except StopIteration:
 | |
|         pass
 | |
|     return call_outcome.get_result()
 | |
| 
 | |
| 
 | |
| class _CallOutcome:
 | |
|     """ Outcome of a function call, either an exception or a proper result.
 | |
|     Calling the ``get_result`` method will return the result or reraise
 | |
|     the exception raised when the function was called. """
 | |
|     excinfo = None
 | |
| 
 | |
|     def __init__(self, func):
 | |
|         try:
 | |
|             self.result = func()
 | |
|         except BaseException:
 | |
|             self.excinfo = sys.exc_info()
 | |
| 
 | |
|     def force_result(self, result):
 | |
|         self.result = result
 | |
|         self.excinfo = None
 | |
| 
 | |
|     def get_result(self):
 | |
|         if self.excinfo is None:
 | |
|             return self.result
 | |
|         else:
 | |
|             ex = self.excinfo
 | |
|             if _py3:
 | |
|                 raise ex[1].with_traceback(ex[2])
 | |
|             _reraise(*ex)  # noqa
 | |
| 
 | |
| if not _py3:
 | |
|     exec("""
 | |
| def _reraise(cls, val, tb):
 | |
|     raise cls, val, tb
 | |
| """)
 | |
| 
 | |
| 
 | |
| class _TracedHookExecution:
 | |
|     def __init__(self, pluginmanager, before, after):
 | |
|         self.pluginmanager = pluginmanager
 | |
|         self.before = before
 | |
|         self.after = after
 | |
|         self.oldcall = pluginmanager._inner_hookexec
 | |
|         assert not isinstance(self.oldcall, _TracedHookExecution)
 | |
|         self.pluginmanager._inner_hookexec = self
 | |
| 
 | |
|     def __call__(self, hook, hook_impls, kwargs):
 | |
|         self.before(hook.name, hook_impls, kwargs)
 | |
|         outcome = _CallOutcome(lambda: self.oldcall(hook, hook_impls, kwargs))
 | |
|         self.after(outcome, hook.name, hook_impls, kwargs)
 | |
|         return outcome.get_result()
 | |
| 
 | |
|     def undo(self):
 | |
|         self.pluginmanager._inner_hookexec = self.oldcall
 | |
| 
 | |
| 
 | |
| class PluginManager(object):
 | |
|     """ Core Pluginmanager class which manages registration
 | |
|     of plugin objects and 1:N hook calling.
 | |
| 
 | |
|     You can register new hooks by calling ``addhooks(module_or_class)``.
 | |
|     You can register plugin objects (which contain hooks) by calling
 | |
|     ``register(plugin)``.  The Pluginmanager is initialized with a
 | |
|     prefix that is searched for in the names of the dict of registered
 | |
|     plugin objects.  An optional excludefunc allows to blacklist names which
 | |
|     are not considered as hooks despite a matching prefix.
 | |
| 
 | |
|     For debugging purposes you can call ``enable_tracing()``
 | |
|     which will subsequently send debug information to the trace helper.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, project_name, implprefix=None):
 | |
|         """ if implprefix is given implementation functions
 | |
|         will be recognized if their name matches the implprefix. """
 | |
|         self.project_name = project_name
 | |
|         self._name2plugin = {}
 | |
|         self._plugin2hookcallers = {}
 | |
|         self._plugin_distinfo = []
 | |
|         self.trace = _TagTracer().get("pluginmanage")
 | |
|         self.hook = _HookRelay(self.trace.root.get("hook"))
 | |
|         self._implprefix = implprefix
 | |
|         self._inner_hookexec = lambda hook, methods, kwargs: \
 | |
|             _MultiCall(methods, kwargs, hook.spec_opts).execute()
 | |
| 
 | |
|     def _hookexec(self, hook, methods, kwargs):
 | |
|         # called from all hookcaller instances.
 | |
|         # enable_tracing will set its own wrapping function at self._inner_hookexec
 | |
|         return self._inner_hookexec(hook, methods, kwargs)
 | |
| 
 | |
|     def register(self, plugin, name=None):
 | |
|         """ Register a plugin and return its canonical name or None if the name
 | |
|         is blocked from registering.  Raise a ValueError if the plugin is already
 | |
|         registered. """
 | |
|         plugin_name = name or self.get_canonical_name(plugin)
 | |
| 
 | |
|         if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers:
 | |
|             if self._name2plugin.get(plugin_name, -1) is None:
 | |
|                 return  # blocked plugin, return None to indicate no registration
 | |
|             raise ValueError("Plugin already registered: %s=%s\n%s" %
 | |
|                             (plugin_name, plugin, self._name2plugin))
 | |
| 
 | |
|         # XXX if an error happens we should make sure no state has been
 | |
|         # changed at point of return
 | |
|         self._name2plugin[plugin_name] = plugin
 | |
| 
 | |
|         # register matching hook implementations of the plugin
 | |
|         self._plugin2hookcallers[plugin] = hookcallers = []
 | |
|         for name in dir(plugin):
 | |
|             hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
 | |
|             if hookimpl_opts is not None:
 | |
|                 normalize_hookimpl_opts(hookimpl_opts)
 | |
|                 method = getattr(plugin, name)
 | |
|                 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
 | |
|                 hook = getattr(self.hook, name, None)
 | |
|                 if hook is None:
 | |
|                     hook = _HookCaller(name, self._hookexec)
 | |
|                     setattr(self.hook, name, hook)
 | |
|                 elif hook.has_spec():
 | |
|                     self._verify_hook(hook, hookimpl)
 | |
|                     hook._maybe_apply_history(hookimpl)
 | |
|                 hook._add_hookimpl(hookimpl)
 | |
|                 hookcallers.append(hook)
 | |
|         return plugin_name
 | |
| 
 | |
|     def parse_hookimpl_opts(self, plugin, name):
 | |
|         method = getattr(plugin, name)
 | |
|         res = getattr(method, self.project_name + "_impl", None)
 | |
|         if res is not None and not isinstance(res, dict):
 | |
|             # false positive
 | |
|             res = None
 | |
|         elif res is None and self._implprefix and name.startswith(self._implprefix):
 | |
|             res = {}
 | |
|         return res
 | |
| 
 | |
|     def unregister(self, plugin=None, name=None):
 | |
|         """ unregister a plugin object and all its contained hook implementations
 | |
|         from internal data structures. """
 | |
|         if name is None:
 | |
|             assert plugin is not None, "one of name or plugin needs to be specified"
 | |
|             name = self.get_name(plugin)
 | |
| 
 | |
|         if plugin is None:
 | |
|             plugin = self.get_plugin(name)
 | |
| 
 | |
|         # if self._name2plugin[name] == None registration was blocked: ignore
 | |
|         if self._name2plugin.get(name):
 | |
|             del self._name2plugin[name]
 | |
| 
 | |
|         for hookcaller in self._plugin2hookcallers.pop(plugin, []):
 | |
|             hookcaller._remove_plugin(plugin)
 | |
| 
 | |
|         return plugin
 | |
| 
 | |
|     def set_blocked(self, name):
 | |
|         """ block registrations of the given name, unregister if already registered. """
 | |
|         self.unregister(name=name)
 | |
|         self._name2plugin[name] = None
 | |
| 
 | |
|     def is_blocked(self, name):
 | |
|         """ return True if the name blogs registering plugins of that name. """
 | |
|         return name in self._name2plugin and self._name2plugin[name] is None
 | |
| 
 | |
|     def add_hookspecs(self, module_or_class):
 | |
|         """ add new hook specifications defined in the given module_or_class.
 | |
|         Functions are recognized if they have been decorated accordingly. """
 | |
|         names = []
 | |
|         for name in dir(module_or_class):
 | |
|             spec_opts = self.parse_hookspec_opts(module_or_class, name)
 | |
|             if spec_opts is not None:
 | |
|                 hc = getattr(self.hook, name, None)
 | |
|                 if hc is None:
 | |
|                     hc = _HookCaller(name, self._hookexec, module_or_class, spec_opts)
 | |
|                     setattr(self.hook, name, hc)
 | |
|                 else:
 | |
|                     # plugins registered this hook without knowing the spec
 | |
|                     hc.set_specification(module_or_class, spec_opts)
 | |
|                     for hookfunction in (hc._wrappers + hc._nonwrappers):
 | |
|                         self._verify_hook(hc, hookfunction)
 | |
|                 names.append(name)
 | |
| 
 | |
|         if not names:
 | |
|             raise ValueError("did not find any %r hooks in %r" %
 | |
|                              (self.project_name, module_or_class))
 | |
| 
 | |
|     def parse_hookspec_opts(self, module_or_class, name):
 | |
|         method = getattr(module_or_class, name)
 | |
|         return getattr(method, self.project_name + "_spec", None)
 | |
| 
 | |
|     def get_plugins(self):
 | |
|         """ return the set of registered plugins. """
 | |
|         return set(self._plugin2hookcallers)
 | |
| 
 | |
|     def is_registered(self, plugin):
 | |
|         """ Return True if the plugin is already registered. """
 | |
|         return plugin in self._plugin2hookcallers
 | |
| 
 | |
|     def get_canonical_name(self, plugin):
 | |
|         """ Return canonical name for a plugin object. Note that a plugin
 | |
|         may be registered under a different name which was specified
 | |
|         by the caller of register(plugin, name). To obtain the name
 | |
|         of an registered plugin use ``get_name(plugin)`` instead."""
 | |
|         return getattr(plugin, "__name__", None) or str(id(plugin))
 | |
| 
 | |
|     def get_plugin(self, name):
 | |
|         """ Return a plugin or None for the given name. """
 | |
|         return self._name2plugin.get(name)
 | |
| 
 | |
|     def get_name(self, plugin):
 | |
|         """ Return name for registered plugin or None if not registered. """
 | |
|         for name, val in self._name2plugin.items():
 | |
|             if plugin == val:
 | |
|                 return name
 | |
| 
 | |
|     def _verify_hook(self, hook, hookimpl):
 | |
|         if hook.is_historic() and hookimpl.hookwrapper:
 | |
|             raise PluginValidationError(
 | |
|                 "Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" %
 | |
|                 (hookimpl.plugin_name, hook.name))
 | |
| 
 | |
|         for arg in hookimpl.argnames:
 | |
|             if arg not in hook.argnames:
 | |
|                 raise PluginValidationError(
 | |
|                     "Plugin %r\nhook %r\nargument %r not available\n"
 | |
|                     "plugin definition: %s\n"
 | |
|                     "available hookargs: %s" %
 | |
|                     (hookimpl.plugin_name, hook.name, arg,
 | |
|                     _formatdef(hookimpl.function), ", ".join(hook.argnames)))
 | |
| 
 | |
|     def check_pending(self):
 | |
|         """ Verify that all hooks which have not been verified against
 | |
|         a hook specification are optional, otherwise raise PluginValidationError"""
 | |
|         for name in self.hook.__dict__:
 | |
|             if name[0] != "_":
 | |
|                 hook = getattr(self.hook, name)
 | |
|                 if not hook.has_spec():
 | |
|                     for hookimpl in (hook._wrappers + hook._nonwrappers):
 | |
|                         if not hookimpl.optionalhook:
 | |
|                             raise PluginValidationError(
 | |
|                                 "unknown hook %r in plugin %r" %
 | |
|                                 (name, hookimpl.plugin))
 | |
| 
 | |
|     def load_setuptools_entrypoints(self, entrypoint_name):
 | |
|         """ Load modules from querying the specified setuptools entrypoint name.
 | |
|         Return the number of loaded plugins. """
 | |
|         from pkg_resources import iter_entry_points, DistributionNotFound
 | |
|         for ep in iter_entry_points(entrypoint_name):
 | |
|             # is the plugin registered or blocked?
 | |
|             if self.get_plugin(ep.name) or self.is_blocked(ep.name):
 | |
|                 continue
 | |
|             try:
 | |
|                 plugin = ep.load()
 | |
|             except DistributionNotFound:
 | |
|                 continue
 | |
|             self.register(plugin, name=ep.name)
 | |
|             self._plugin_distinfo.append((plugin, ep.dist))
 | |
|         return len(self._plugin_distinfo)
 | |
| 
 | |
|     def list_plugin_distinfo(self):
 | |
|         """ return list of distinfo/plugin tuples for all setuptools registered
 | |
|         plugins. """
 | |
|         return list(self._plugin_distinfo)
 | |
| 
 | |
|     def list_name_plugin(self):
 | |
|         """ return list of name/plugin pairs. """
 | |
|         return list(self._name2plugin.items())
 | |
| 
 | |
|     def get_hookcallers(self, plugin):
 | |
|         """ get all hook callers for the specified plugin. """
 | |
|         return self._plugin2hookcallers.get(plugin)
 | |
| 
 | |
|     def add_hookcall_monitoring(self, before, after):
 | |
|         """ add before/after tracing functions for all hooks
 | |
|         and return an undo function which, when called,
 | |
|         will remove the added tracers.
 | |
| 
 | |
|         ``before(hook_name, hook_impls, kwargs)`` will be called ahead
 | |
|         of all hook calls and receive a hookcaller instance, a list
 | |
|         of HookImpl instances and the keyword arguments for the hook call.
 | |
| 
 | |
|         ``after(outcome, hook_name, hook_impls, kwargs)`` receives the
 | |
|         same arguments as ``before`` but also a :py:class:`_CallOutcome`` object
 | |
|         which represents the result of the overall hook call.
 | |
|         """
 | |
|         return _TracedHookExecution(self, before, after).undo
 | |
| 
 | |
|     def enable_tracing(self):
 | |
|         """ enable tracing of hook calls and return an undo function. """
 | |
|         hooktrace = self.hook._trace
 | |
| 
 | |
|         def before(hook_name, methods, kwargs):
 | |
|             hooktrace.root.indent += 1
 | |
|             hooktrace(hook_name, kwargs)
 | |
| 
 | |
|         def after(outcome, hook_name, methods, kwargs):
 | |
|             if outcome.excinfo is None:
 | |
|                 hooktrace("finish", hook_name, "-->", outcome.result)
 | |
|             hooktrace.root.indent -= 1
 | |
| 
 | |
|         return self.add_hookcall_monitoring(before, after)
 | |
| 
 | |
|     def subset_hook_caller(self, name, remove_plugins):
 | |
|         """ Return a new _HookCaller instance for the named method
 | |
|         which manages calls to all registered plugins except the
 | |
|         ones from remove_plugins. """
 | |
|         orig = getattr(self.hook, name)
 | |
|         plugins_to_remove = [plug for plug in remove_plugins if hasattr(plug, name)]
 | |
|         if plugins_to_remove:
 | |
|             hc = _HookCaller(orig.name, orig._hookexec, orig._specmodule_or_class,
 | |
|                              orig.spec_opts)
 | |
|             for hookimpl in (orig._wrappers + orig._nonwrappers):
 | |
|                 plugin = hookimpl.plugin
 | |
|                 if plugin not in plugins_to_remove:
 | |
|                     hc._add_hookimpl(hookimpl)
 | |
|                     # we also keep track of this hook caller so it
 | |
|                     # gets properly removed on plugin unregistration
 | |
|                     self._plugin2hookcallers.setdefault(plugin, []).append(hc)
 | |
|             return hc
 | |
|         return orig
 | |
| 
 | |
| 
 | |
| class _MultiCall:
 | |
|     """ execute a call into multiple python functions/methods. """
 | |
| 
 | |
|     # XXX note that the __multicall__ argument is supported only
 | |
|     # for pytest compatibility reasons.  It was never officially
 | |
|     # supported there and is explicitely deprecated since 2.8
 | |
|     # so we can remove it soon, allowing to avoid the below recursion
 | |
|     # in execute() and simplify/speed up the execute loop.
 | |
| 
 | |
|     def __init__(self, hook_impls, kwargs, specopts={}):
 | |
|         self.hook_impls = hook_impls
 | |
|         self.kwargs = kwargs
 | |
|         self.kwargs["__multicall__"] = self
 | |
|         self.specopts = specopts
 | |
| 
 | |
|     def execute(self):
 | |
|         all_kwargs = self.kwargs
 | |
|         self.results = results = []
 | |
|         firstresult = self.specopts.get("firstresult")
 | |
| 
 | |
|         while self.hook_impls:
 | |
|             hook_impl = self.hook_impls.pop()
 | |
|             args = [all_kwargs[argname] for argname in hook_impl.argnames]
 | |
|             if hook_impl.hookwrapper:
 | |
|                 return _wrapped_call(hook_impl.function(*args), self.execute)
 | |
|             res = hook_impl.function(*args)
 | |
|             if res is not None:
 | |
|                 if firstresult:
 | |
|                     return res
 | |
|                 results.append(res)
 | |
| 
 | |
|         if not firstresult:
 | |
|             return results
 | |
| 
 | |
|     def __repr__(self):
 | |
|         status = "%d meths" % (len(self.hook_impls),)
 | |
|         if hasattr(self, "results"):
 | |
|             status = ("%d results, " % len(self.results)) + status
 | |
|         return "<_MultiCall %s, kwargs=%r>" % (status, self.kwargs)
 | |
| 
 | |
| 
 | |
| def varnames(func, startindex=None):
 | |
|     """ return argument name tuple for a function, method, class or callable.
 | |
| 
 | |
|     In case of a class, its "__init__" method is considered.
 | |
|     For methods the "self" parameter is not included unless you are passing
 | |
|     an unbound method with Python3 (which has no supports for unbound methods)
 | |
|     """
 | |
|     cache = getattr(func, "__dict__", {})
 | |
|     try:
 | |
|         return cache["_varnames"]
 | |
|     except KeyError:
 | |
|         pass
 | |
|     if inspect.isclass(func):
 | |
|         try:
 | |
|             func = func.__init__
 | |
|         except AttributeError:
 | |
|             return ()
 | |
|         startindex = 1
 | |
|     else:
 | |
|         if not inspect.isfunction(func) and not inspect.ismethod(func):
 | |
|             func = getattr(func, '__call__', func)
 | |
|         if startindex is None:
 | |
|             startindex = int(inspect.ismethod(func))
 | |
| 
 | |
|     try:
 | |
|         rawcode = func.__code__
 | |
|     except AttributeError:
 | |
|         return ()
 | |
|     try:
 | |
|         x = rawcode.co_varnames[startindex:rawcode.co_argcount]
 | |
|     except AttributeError:
 | |
|         x = ()
 | |
|     else:
 | |
|         defaults = func.__defaults__
 | |
|         if defaults:
 | |
|             x = x[:-len(defaults)]
 | |
|     try:
 | |
|         cache["_varnames"] = x
 | |
|     except TypeError:
 | |
|         pass
 | |
|     return x
 | |
| 
 | |
| 
 | |
| class _HookRelay:
 | |
|     """ hook holder object for performing 1:N hook calls where N is the number
 | |
|     of registered plugins.
 | |
| 
 | |
|     """
 | |
| 
 | |
|     def __init__(self, trace):
 | |
|         self._trace = trace
 | |
| 
 | |
| 
 | |
| class _HookCaller(object):
 | |
|     def __init__(self, name, hook_execute, specmodule_or_class=None, spec_opts=None):
 | |
|         self.name = name
 | |
|         self._wrappers = []
 | |
|         self._nonwrappers = []
 | |
|         self._hookexec = hook_execute
 | |
|         if specmodule_or_class is not None:
 | |
|             assert spec_opts is not None
 | |
|             self.set_specification(specmodule_or_class, spec_opts)
 | |
| 
 | |
|     def has_spec(self):
 | |
|         return hasattr(self, "_specmodule_or_class")
 | |
| 
 | |
|     def set_specification(self, specmodule_or_class, spec_opts):
 | |
|         assert not self.has_spec()
 | |
|         self._specmodule_or_class = specmodule_or_class
 | |
|         specfunc = getattr(specmodule_or_class, self.name)
 | |
|         argnames = varnames(specfunc, startindex=inspect.isclass(specmodule_or_class))
 | |
|         assert "self" not in argnames  # sanity check
 | |
|         self.argnames = ["__multicall__"] + list(argnames)
 | |
|         self.spec_opts = spec_opts
 | |
|         if spec_opts.get("historic"):
 | |
|             self._call_history = []
 | |
| 
 | |
|     def is_historic(self):
 | |
|         return hasattr(self, "_call_history")
 | |
| 
 | |
|     def _remove_plugin(self, plugin):
 | |
|         def remove(wrappers):
 | |
|             for i, method in enumerate(wrappers):
 | |
|                 if method.plugin == plugin:
 | |
|                     del wrappers[i]
 | |
|                     return True
 | |
|         if remove(self._wrappers) is None:
 | |
|             if remove(self._nonwrappers) is None:
 | |
|                 raise ValueError("plugin %r not found" % (plugin,))
 | |
| 
 | |
|     def _add_hookimpl(self, hookimpl):
 | |
|         if hookimpl.hookwrapper:
 | |
|             methods = self._wrappers
 | |
|         else:
 | |
|             methods = self._nonwrappers
 | |
| 
 | |
|         if hookimpl.trylast:
 | |
|             methods.insert(0, hookimpl)
 | |
|         elif hookimpl.tryfirst:
 | |
|             methods.append(hookimpl)
 | |
|         else:
 | |
|             # find last non-tryfirst method
 | |
|             i = len(methods) - 1
 | |
|             while i >= 0 and methods[i].tryfirst:
 | |
|                 i -= 1
 | |
|             methods.insert(i + 1, hookimpl)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "<_HookCaller %r>" % (self.name,)
 | |
| 
 | |
|     def __call__(self, **kwargs):
 | |
|         assert not self.is_historic()
 | |
|         return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
 | |
| 
 | |
|     def call_historic(self, proc=None, kwargs=None):
 | |
|         self._call_history.append((kwargs or {}, proc))
 | |
|         # historizing hooks don't return results
 | |
|         self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
 | |
| 
 | |
|     def call_extra(self, methods, kwargs):
 | |
|         """ Call the hook with some additional temporarily participating
 | |
|         methods using the specified kwargs as call parameters. """
 | |
|         old = list(self._nonwrappers), list(self._wrappers)
 | |
|         for method in methods:
 | |
|             opts = dict(hookwrapper=False, trylast=False, tryfirst=False)
 | |
|             hookimpl = HookImpl(None, "<temp>", method, opts)
 | |
|             self._add_hookimpl(hookimpl)
 | |
|         try:
 | |
|             return self(**kwargs)
 | |
|         finally:
 | |
|             self._nonwrappers, self._wrappers = old
 | |
| 
 | |
|     def _maybe_apply_history(self, method):
 | |
|         if self.is_historic():
 | |
|             for kwargs, proc in self._call_history:
 | |
|                 res = self._hookexec(self, [method], kwargs)
 | |
|                 if res and proc is not None:
 | |
|                     proc(res[0])
 | |
| 
 | |
| 
 | |
| class HookImpl:
 | |
|     def __init__(self, plugin, plugin_name, function, hook_impl_opts):
 | |
|         self.function = function
 | |
|         self.argnames = varnames(self.function)
 | |
|         self.plugin = plugin
 | |
|         self.opts = hook_impl_opts
 | |
|         self.plugin_name = plugin_name
 | |
|         self.__dict__.update(hook_impl_opts)
 | |
| 
 | |
| 
 | |
| class PluginValidationError(Exception):
 | |
|     """ plugin failed validation. """
 | |
| 
 | |
| 
 | |
| def _formatdef(func):
 | |
|     return "%s%s" % (
 | |
|         func.__name__,
 | |
|         inspect.formatargspec(*inspect.getargspec(func))
 | |
|     )
 |