forked from bton/matekasse
602 lines
20 KiB
Python
602 lines
20 KiB
Python
"""
|
|
Internal hook annotation, representation and calling machinery.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import inspect
|
|
import sys
|
|
import warnings
|
|
from types import ModuleType
|
|
from typing import AbstractSet
|
|
from typing import Any
|
|
from typing import Callable
|
|
from typing import Generator
|
|
from typing import List
|
|
from typing import Mapping
|
|
from typing import Optional
|
|
from typing import overload
|
|
from typing import Sequence
|
|
from typing import Tuple
|
|
from typing import TYPE_CHECKING
|
|
from typing import TypeVar
|
|
from typing import Union
|
|
|
|
from ._result import _Result
|
|
|
|
if TYPE_CHECKING:
|
|
from typing_extensions import TypedDict
|
|
from typing_extensions import Final
|
|
|
|
|
|
_T = TypeVar("_T")
|
|
_F = TypeVar("_F", bound=Callable[..., object])
|
|
_Namespace = Union[ModuleType, type]
|
|
_Plugin = object
|
|
_HookExec = Callable[
|
|
[str, Sequence["HookImpl"], Mapping[str, object], bool],
|
|
Union[object, List[object]],
|
|
]
|
|
_HookImplFunction = Callable[..., Union[_T, Generator[None, _Result[_T], None]]]
|
|
if TYPE_CHECKING:
|
|
|
|
class _HookSpecOpts(TypedDict):
|
|
firstresult: bool
|
|
historic: bool
|
|
warn_on_impl: Warning | None
|
|
|
|
class _HookImplOpts(TypedDict):
|
|
wrapper: bool
|
|
hookwrapper: bool
|
|
optionalhook: bool
|
|
tryfirst: bool
|
|
trylast: bool
|
|
specname: str | None
|
|
|
|
|
|
class HookspecMarker:
|
|
"""Decorator for marking functions as hook specifications.
|
|
|
|
Instantiate it with a project_name to get a decorator.
|
|
Calling :meth:`PluginManager.add_hookspecs` later will discover all marked
|
|
functions if the :class:`PluginManager` uses the same project_name.
|
|
"""
|
|
|
|
__slots__ = ("project_name",)
|
|
|
|
def __init__(self, project_name: str) -> None:
|
|
self.project_name: Final = project_name
|
|
|
|
@overload
|
|
def __call__(
|
|
self,
|
|
function: _F,
|
|
firstresult: bool = False,
|
|
historic: bool = False,
|
|
warn_on_impl: Warning | None = None,
|
|
) -> _F:
|
|
...
|
|
|
|
@overload # noqa: F811
|
|
def __call__( # noqa: F811
|
|
self,
|
|
function: None = ...,
|
|
firstresult: bool = ...,
|
|
historic: bool = ...,
|
|
warn_on_impl: Warning | None = ...,
|
|
) -> Callable[[_F], _F]:
|
|
...
|
|
|
|
def __call__( # noqa: F811
|
|
self,
|
|
function: _F | None = None,
|
|
firstresult: bool = False,
|
|
historic: bool = False,
|
|
warn_on_impl: Warning | None = None,
|
|
) -> _F | Callable[[_F], _F]:
|
|
"""If passed a function, directly sets attributes on the function
|
|
which will make it discoverable to :meth:`PluginManager.add_hookspecs`.
|
|
|
|
If passed no function, returns a decorator which can be applied to a
|
|
function later using the attributes supplied.
|
|
|
|
If ``firstresult`` is ``True``, the 1:N hook call (N being the number of
|
|
registered hook implementation functions) will stop at I<=N when the
|
|
I'th function returns a non-``None`` result.
|
|
|
|
If ``historic`` is ``True``, every call to the hook will be memorized
|
|
and replayed on plugins registered after the call was made.
|
|
"""
|
|
|
|
def setattr_hookspec_opts(func: _F) -> _F:
|
|
if historic and firstresult:
|
|
raise ValueError("cannot have a historic firstresult hook")
|
|
opts: _HookSpecOpts = {
|
|
"firstresult": firstresult,
|
|
"historic": historic,
|
|
"warn_on_impl": warn_on_impl,
|
|
}
|
|
setattr(func, self.project_name + "_spec", opts)
|
|
return func
|
|
|
|
if function is not None:
|
|
return setattr_hookspec_opts(function)
|
|
else:
|
|
return setattr_hookspec_opts
|
|
|
|
|
|
class HookimplMarker:
|
|
"""Decorator for marking functions as hook implementations.
|
|
|
|
Instantiate it with a ``project_name`` to get a decorator.
|
|
Calling :meth:`PluginManager.register` later will discover all marked
|
|
functions if the :class:`PluginManager` uses the same project_name.
|
|
"""
|
|
|
|
__slots__ = ("project_name",)
|
|
|
|
def __init__(self, project_name: str) -> None:
|
|
self.project_name: Final = project_name
|
|
|
|
@overload
|
|
def __call__(
|
|
self,
|
|
function: _F,
|
|
hookwrapper: bool = ...,
|
|
optionalhook: bool = ...,
|
|
tryfirst: bool = ...,
|
|
trylast: bool = ...,
|
|
specname: str | None = ...,
|
|
wrapper: bool = ...,
|
|
) -> _F:
|
|
...
|
|
|
|
@overload # noqa: F811
|
|
def __call__( # noqa: F811
|
|
self,
|
|
function: None = ...,
|
|
hookwrapper: bool = ...,
|
|
optionalhook: bool = ...,
|
|
tryfirst: bool = ...,
|
|
trylast: bool = ...,
|
|
specname: str | None = ...,
|
|
wrapper: bool = ...,
|
|
) -> Callable[[_F], _F]:
|
|
...
|
|
|
|
def __call__( # noqa: F811
|
|
self,
|
|
function: _F | None = None,
|
|
hookwrapper: bool = False,
|
|
optionalhook: bool = False,
|
|
tryfirst: bool = False,
|
|
trylast: bool = False,
|
|
specname: str | None = None,
|
|
wrapper: bool = False,
|
|
) -> _F | Callable[[_F], _F]:
|
|
"""If passed a function, directly sets attributes on the function
|
|
which will make it discoverable to :meth:`PluginManager.register`.
|
|
|
|
If passed no function, returns a decorator which can be applied to a
|
|
function later using the attributes supplied.
|
|
|
|
If ``optionalhook`` is ``True``, a missing matching hook specification
|
|
will not result in an error (by default it is an error if no matching
|
|
spec is found).
|
|
|
|
If ``tryfirst`` is ``True``, this hook implementation will run as early
|
|
as possible in the chain of N hook implementations for a specification.
|
|
|
|
If ``trylast`` is ``True``, this hook implementation will run as late as
|
|
possible in the chain of N hook implementations.
|
|
|
|
If ``wrapper`` is ``True``("new-style hook wrapper"), the hook
|
|
implementation needs to execute exactly one ``yield``. The code before
|
|
the ``yield`` is run early before any non-hook-wrapper function is run.
|
|
The code after the ``yield`` is run after all non-hook-wrapper functions
|
|
have run. The ``yield`` receives the result value of the inner calls, or
|
|
raises the exception of inner calls (including earlier hook wrapper
|
|
calls). The return value of the function becomes the return value of the
|
|
hook, and a raised exception becomes the exception of the hook.
|
|
|
|
If ``hookwrapper`` is ``True`` ("old-style hook wrapper"), the hook
|
|
implementation needs to execute exactly one ``yield``. The code before
|
|
the ``yield`` is run early before any non-hook-wrapper function is run.
|
|
The code after the ``yield`` is run after all non-hook-wrapper function
|
|
have run The ``yield`` receives a :class:`_Result` object representing
|
|
the exception or result outcome of the inner calls (including earlier
|
|
hook wrapper calls). This option is mutually exclusive with ``wrapper``.
|
|
|
|
If ``specname`` is provided, it will be used instead of the function
|
|
name when matching this hook implementation to a hook specification
|
|
during registration.
|
|
|
|
.. versionadded:: 1.2.0
|
|
The ``wrapper`` parameter.
|
|
"""
|
|
|
|
def setattr_hookimpl_opts(func: _F) -> _F:
|
|
opts: _HookImplOpts = {
|
|
"wrapper": wrapper,
|
|
"hookwrapper": hookwrapper,
|
|
"optionalhook": optionalhook,
|
|
"tryfirst": tryfirst,
|
|
"trylast": trylast,
|
|
"specname": specname,
|
|
}
|
|
setattr(func, self.project_name + "_impl", opts)
|
|
return func
|
|
|
|
if function is None:
|
|
return setattr_hookimpl_opts
|
|
else:
|
|
return setattr_hookimpl_opts(function)
|
|
|
|
|
|
def normalize_hookimpl_opts(opts: _HookImplOpts) -> None:
|
|
opts.setdefault("tryfirst", False)
|
|
opts.setdefault("trylast", False)
|
|
opts.setdefault("wrapper", False)
|
|
opts.setdefault("hookwrapper", False)
|
|
opts.setdefault("optionalhook", False)
|
|
opts.setdefault("specname", None)
|
|
|
|
|
|
_PYPY = hasattr(sys, "pypy_version_info")
|
|
|
|
|
|
def varnames(func: object) -> tuple[tuple[str, ...], tuple[str, ...]]:
|
|
"""Return tuple of positional and keywrord argument names for a function,
|
|
method, class or callable.
|
|
|
|
In case of a class, its ``__init__`` method is considered.
|
|
For methods the ``self`` parameter is not included.
|
|
"""
|
|
if inspect.isclass(func):
|
|
try:
|
|
func = func.__init__
|
|
except AttributeError:
|
|
return (), ()
|
|
elif not inspect.isroutine(func): # callable object?
|
|
try:
|
|
func = getattr(func, "__call__", func)
|
|
except Exception:
|
|
return (), ()
|
|
|
|
try:
|
|
# func MUST be a function or method here or we won't parse any args.
|
|
sig = inspect.signature(
|
|
func.__func__ if inspect.ismethod(func) else func # type:ignore[arg-type]
|
|
)
|
|
except TypeError:
|
|
return (), ()
|
|
|
|
_valid_param_kinds = (
|
|
inspect.Parameter.POSITIONAL_ONLY,
|
|
inspect.Parameter.POSITIONAL_OR_KEYWORD,
|
|
)
|
|
_valid_params = {
|
|
name: param
|
|
for name, param in sig.parameters.items()
|
|
if param.kind in _valid_param_kinds
|
|
}
|
|
args = tuple(_valid_params)
|
|
defaults = (
|
|
tuple(
|
|
param.default
|
|
for param in _valid_params.values()
|
|
if param.default is not param.empty
|
|
)
|
|
or None
|
|
)
|
|
|
|
if defaults:
|
|
index = -len(defaults)
|
|
args, kwargs = args[:index], tuple(args[index:])
|
|
else:
|
|
kwargs = ()
|
|
|
|
# strip any implicit instance arg
|
|
# pypy3 uses "obj" instead of "self" for default dunder methods
|
|
if not _PYPY:
|
|
implicit_names: tuple[str, ...] = ("self",)
|
|
else:
|
|
implicit_names = ("self", "obj")
|
|
if args:
|
|
qualname: str = getattr(func, "__qualname__", "")
|
|
if inspect.ismethod(func) or ("." in qualname and args[0] in implicit_names):
|
|
args = args[1:]
|
|
|
|
return args, kwargs
|
|
|
|
|
|
class _HookRelay:
|
|
"""Hook holder object for performing 1:N hook calls where N is the number
|
|
of registered plugins."""
|
|
|
|
__slots__ = ("__dict__",)
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
def __getattr__(self, name: str) -> _HookCaller:
|
|
...
|
|
|
|
|
|
_CallHistory = List[Tuple[Mapping[str, object], Optional[Callable[[Any], None]]]]
|
|
|
|
|
|
class _HookCaller:
|
|
__slots__ = (
|
|
"name",
|
|
"spec",
|
|
"_hookexec",
|
|
"_hookimpls",
|
|
"_call_history",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
hook_execute: _HookExec,
|
|
specmodule_or_class: _Namespace | None = None,
|
|
spec_opts: _HookSpecOpts | None = None,
|
|
) -> None:
|
|
self.name: Final = name
|
|
self._hookexec: Final = hook_execute
|
|
self._hookimpls: Final[list[HookImpl]] = []
|
|
self._call_history: _CallHistory | None = None
|
|
self.spec: HookSpec | None = None
|
|
if specmodule_or_class is not None:
|
|
assert spec_opts is not None
|
|
self.set_specification(specmodule_or_class, spec_opts)
|
|
|
|
def has_spec(self) -> bool:
|
|
return self.spec is not None
|
|
|
|
def set_specification(
|
|
self,
|
|
specmodule_or_class: _Namespace,
|
|
spec_opts: _HookSpecOpts,
|
|
) -> None:
|
|
if self.spec is not None:
|
|
raise ValueError(
|
|
f"Hook {self.spec.name!r} is already registered "
|
|
f"within namespace {self.spec.namespace}"
|
|
)
|
|
self.spec = HookSpec(specmodule_or_class, self.name, spec_opts)
|
|
if spec_opts.get("historic"):
|
|
self._call_history = []
|
|
|
|
def is_historic(self) -> bool:
|
|
return self._call_history is not None
|
|
|
|
def _remove_plugin(self, plugin: _Plugin) -> None:
|
|
for i, method in enumerate(self._hookimpls):
|
|
if method.plugin == plugin:
|
|
del self._hookimpls[i]
|
|
return
|
|
raise ValueError(f"plugin {plugin!r} not found")
|
|
|
|
def get_hookimpls(self) -> list[HookImpl]:
|
|
return self._hookimpls.copy()
|
|
|
|
def _add_hookimpl(self, hookimpl: HookImpl) -> None:
|
|
"""Add an implementation to the callback chain."""
|
|
for i, method in enumerate(self._hookimpls):
|
|
if method.hookwrapper or method.wrapper:
|
|
splitpoint = i
|
|
break
|
|
else:
|
|
splitpoint = len(self._hookimpls)
|
|
if hookimpl.hookwrapper or hookimpl.wrapper:
|
|
start, end = splitpoint, len(self._hookimpls)
|
|
else:
|
|
start, end = 0, splitpoint
|
|
|
|
if hookimpl.trylast:
|
|
self._hookimpls.insert(start, hookimpl)
|
|
elif hookimpl.tryfirst:
|
|
self._hookimpls.insert(end, hookimpl)
|
|
else:
|
|
# find last non-tryfirst method
|
|
i = end - 1
|
|
while i >= start and self._hookimpls[i].tryfirst:
|
|
i -= 1
|
|
self._hookimpls.insert(i + 1, hookimpl)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<_HookCaller {self.name!r}>"
|
|
|
|
def _verify_all_args_are_provided(self, kwargs: Mapping[str, object]) -> None:
|
|
# This is written to avoid expensive operations when not needed.
|
|
if self.spec:
|
|
for argname in self.spec.argnames:
|
|
if argname not in kwargs:
|
|
notincall = ", ".join(
|
|
repr(argname)
|
|
for argname in self.spec.argnames
|
|
# Avoid self.spec.argnames - kwargs.keys() - doesn't preserve order.
|
|
if argname not in kwargs.keys()
|
|
)
|
|
warnings.warn(
|
|
"Argument(s) {} which are declared in the hookspec "
|
|
"cannot be found in this hook call".format(notincall),
|
|
stacklevel=2,
|
|
)
|
|
break
|
|
|
|
def __call__(self, **kwargs: object) -> Any:
|
|
assert (
|
|
not self.is_historic()
|
|
), "Cannot directly call a historic hook - use call_historic instead."
|
|
self._verify_all_args_are_provided(kwargs)
|
|
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
|
|
return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
|
|
|
|
def call_historic(
|
|
self,
|
|
result_callback: Callable[[Any], None] | None = None,
|
|
kwargs: Mapping[str, object] | None = None,
|
|
) -> None:
|
|
"""Call the hook with given ``kwargs`` for all registered plugins and
|
|
for all plugins which will be registered afterwards.
|
|
|
|
If ``result_callback`` is provided, it will be called for each
|
|
non-``None`` result obtained from a hook implementation.
|
|
"""
|
|
assert self._call_history is not None
|
|
kwargs = kwargs or {}
|
|
self._verify_all_args_are_provided(kwargs)
|
|
self._call_history.append((kwargs, result_callback))
|
|
# Historizing hooks don't return results.
|
|
# Remember firstresult isn't compatible with historic.
|
|
res = self._hookexec(self.name, self._hookimpls, kwargs, False)
|
|
if result_callback is None:
|
|
return
|
|
if isinstance(res, list):
|
|
for x in res:
|
|
result_callback(x)
|
|
|
|
def call_extra(
|
|
self, methods: Sequence[Callable[..., object]], kwargs: Mapping[str, object]
|
|
) -> Any:
|
|
"""Call the hook with some additional temporarily participating
|
|
methods using the specified ``kwargs`` as call parameters."""
|
|
assert (
|
|
not self.is_historic()
|
|
), "Cannot directly call a historic hook - use call_historic instead."
|
|
self._verify_all_args_are_provided(kwargs)
|
|
opts: _HookImplOpts = {
|
|
"wrapper": False,
|
|
"hookwrapper": False,
|
|
"optionalhook": False,
|
|
"trylast": False,
|
|
"tryfirst": False,
|
|
"specname": None,
|
|
}
|
|
hookimpls = self._hookimpls.copy()
|
|
for method in methods:
|
|
hookimpl = HookImpl(None, "<temp>", method, opts)
|
|
# Find last non-tryfirst nonwrapper method.
|
|
i = len(hookimpls) - 1
|
|
while (
|
|
i >= 0
|
|
and hookimpls[i].tryfirst
|
|
and not (hookimpls[i].hookwrapper or hookimpls[i].wrapper)
|
|
):
|
|
i -= 1
|
|
hookimpls.insert(i + 1, hookimpl)
|
|
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
|
|
return self._hookexec(self.name, hookimpls, kwargs, firstresult)
|
|
|
|
def _maybe_apply_history(self, method: HookImpl) -> None:
|
|
"""Apply call history to a new hookimpl if it is marked as historic."""
|
|
if self.is_historic():
|
|
assert self._call_history is not None
|
|
for kwargs, result_callback in self._call_history:
|
|
res = self._hookexec(self.name, [method], kwargs, False)
|
|
if res and result_callback is not None:
|
|
# XXX: remember firstresult isn't compat with historic
|
|
assert isinstance(res, list)
|
|
result_callback(res[0])
|
|
|
|
|
|
class _SubsetHookCaller(_HookCaller):
|
|
"""A proxy to another HookCaller which manages calls to all registered
|
|
plugins except the ones from remove_plugins."""
|
|
|
|
# This class is unusual: in inhertits from `_HookCaller` so all of
|
|
# the *code* runs in the class, but it delegates all underlying *data*
|
|
# to the original HookCaller.
|
|
# `subset_hook_caller` used to be implemented by creating a full-fledged
|
|
# HookCaller, copying all hookimpls from the original. This had problems
|
|
# with memory leaks (#346) and historic calls (#347), which make a proxy
|
|
# approach better.
|
|
# An alternative implementation is to use a `_getattr__`/`__getattribute__`
|
|
# proxy, however that adds more overhead and is more tricky to implement.
|
|
|
|
__slots__ = (
|
|
"_orig",
|
|
"_remove_plugins",
|
|
)
|
|
|
|
def __init__(self, orig: _HookCaller, remove_plugins: AbstractSet[_Plugin]) -> None:
|
|
self._orig = orig
|
|
self._remove_plugins = remove_plugins
|
|
self.name = orig.name # type: ignore[misc]
|
|
self._hookexec = orig._hookexec # type: ignore[misc]
|
|
|
|
@property # type: ignore[misc]
|
|
def _hookimpls(self) -> list[HookImpl]:
|
|
return [
|
|
impl
|
|
for impl in self._orig._hookimpls
|
|
if impl.plugin not in self._remove_plugins
|
|
]
|
|
|
|
@property
|
|
def spec(self) -> HookSpec | None: # type: ignore[override]
|
|
return self._orig.spec
|
|
|
|
@property
|
|
def _call_history(self) -> _CallHistory | None: # type: ignore[override]
|
|
return self._orig._call_history
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<_SubsetHookCaller {self.name!r}>"
|
|
|
|
|
|
class HookImpl:
|
|
__slots__ = (
|
|
"function",
|
|
"argnames",
|
|
"kwargnames",
|
|
"plugin",
|
|
"opts",
|
|
"plugin_name",
|
|
"wrapper",
|
|
"hookwrapper",
|
|
"optionalhook",
|
|
"tryfirst",
|
|
"trylast",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
plugin: _Plugin,
|
|
plugin_name: str,
|
|
function: _HookImplFunction[object],
|
|
hook_impl_opts: _HookImplOpts,
|
|
) -> None:
|
|
self.function: Final = function
|
|
self.argnames, self.kwargnames = varnames(self.function)
|
|
self.plugin = plugin
|
|
self.opts = hook_impl_opts
|
|
self.plugin_name = plugin_name
|
|
self.wrapper = hook_impl_opts["wrapper"]
|
|
self.hookwrapper = hook_impl_opts["hookwrapper"]
|
|
self.optionalhook = hook_impl_opts["optionalhook"]
|
|
self.tryfirst = hook_impl_opts["tryfirst"]
|
|
self.trylast = hook_impl_opts["trylast"]
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<HookImpl plugin_name={self.plugin_name!r}, plugin={self.plugin!r}>"
|
|
|
|
|
|
class HookSpec:
|
|
__slots__ = (
|
|
"namespace",
|
|
"function",
|
|
"name",
|
|
"argnames",
|
|
"kwargnames",
|
|
"opts",
|
|
"warn_on_impl",
|
|
)
|
|
|
|
def __init__(self, namespace: _Namespace, name: str, opts: _HookSpecOpts) -> None:
|
|
self.namespace = namespace
|
|
self.function: Callable[..., object] = getattr(namespace, name)
|
|
self.name = name
|
|
self.argnames, self.kwargnames = varnames(self.function)
|
|
self.opts = opts
|
|
self.warn_on_impl = opts.get("warn_on_impl")
|