add classes and methods for alternative closure calculation
This commit is contained in:
parent
cdddd6d695
commit
3f91fb839d
|
@ -1,7 +1,9 @@
|
|||
import abc
|
||||
import copy
|
||||
import dataclasses
|
||||
import functools
|
||||
import inspect
|
||||
import itertools
|
||||
import os
|
||||
import warnings
|
||||
from collections import defaultdict
|
||||
|
@ -12,6 +14,7 @@ from typing import AbstractSet
|
|||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import cast
|
||||
from typing import Collection
|
||||
from typing import Dict
|
||||
from typing import Final
|
||||
from typing import final
|
||||
|
@ -20,7 +23,9 @@ from typing import Generic
|
|||
from typing import Iterable
|
||||
from typing import Iterator
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import MutableMapping
|
||||
from typing import MutableSet
|
||||
from typing import NoReturn
|
||||
from typing import Optional
|
||||
from typing import overload
|
||||
|
@ -340,6 +345,243 @@ class FuncFixtureInfo:
|
|||
self.names_closure[:] = sorted(closure, key=self.names_closure.index)
|
||||
|
||||
|
||||
# this exceptions is mainly intended for internal signaling
|
||||
class CyclicDependency(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# this exceptions is mainly intended for internal signaling
|
||||
class DuplicateDependency(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# keep the interface (mostly) backwards compatible to FuncFixtureInfo.
|
||||
# the topological sort/graph cycle detection algorithm is based on the paper
|
||||
# "A dynamic topological sort algorithm for directed acyclic graphs" by
|
||||
# David J. Pearce and Paul H. J. Kelly
|
||||
# doi.org/10.1145/1187436.1210590
|
||||
# whileydave.com/publications/pk07_jea
|
||||
class FuncFixtureInfo2:
|
||||
# TODO:
|
||||
# - check which sort/sorted calls need to be reversed by default.
|
||||
# - maybe replace Tuple[str, int] by a NamedTuple?
|
||||
# - make this a frozen dataclass again? this would make clone more difficult to implement!
|
||||
__slots__ = (
|
||||
"argnames",
|
||||
"initialnames",
|
||||
"name2fixturedefs",
|
||||
"_closure_store",
|
||||
"_incoming",
|
||||
"_rank",
|
||||
"_reuse",
|
||||
"_outgoing",
|
||||
)
|
||||
|
||||
# Fixture names that the item requests directly by function parameters.
|
||||
argnames: Tuple[str, ...]
|
||||
# Fixture names that the item immediately requires. These include
|
||||
# argnames + fixture names specified via usefixtures and via autouse=True in
|
||||
# fixture definitions.
|
||||
initialnames: Tuple[str, ...]
|
||||
# A map from a fixture name in the transitive closure to the FixtureDefs
|
||||
# matching the name which are applicable to this function.
|
||||
# There may be multiple overriding fixtures with the same name. The
|
||||
# sequence is ordered from furthest to closes to the function.
|
||||
name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]]
|
||||
|
||||
# topological rank / does not honor scope
|
||||
_rank: MutableMapping[Tuple[str, int], int]
|
||||
# reusable ranks. get populated by removing a graph node
|
||||
_reuse: MutableSet[int]
|
||||
# outgoing edges of the graph
|
||||
_outgoing: MutableMapping[Tuple[str, int], MutableMapping[Tuple[str, int], None]]
|
||||
# incoming edges of the grpah
|
||||
_incoming: MutableMapping[Tuple[str, int], MutableMapping[Tuple[str, int], None]]
|
||||
# backing store for name_closure. this is only needed because several
|
||||
# objects just take references to it.
|
||||
_closure_store: List[str]
|
||||
|
||||
def __init__(self, argnames: Tuple[str, ...], initialnames: Tuple[str, ...]):
|
||||
self.argnames = argnames
|
||||
self.initialnames = initialnames
|
||||
# populate while building
|
||||
self.name2fixturedefs = {}
|
||||
|
||||
self._rank = defaultdict(
|
||||
lambda: self._reuse.pop() if self._reuse else len(self._rank)
|
||||
)
|
||||
self._outgoing = defaultdict(defaultdict)
|
||||
self._incoming = defaultdict(defaultdict)
|
||||
self._reuse = set()
|
||||
|
||||
def clone(self) -> "FuncFixtureInfo2":
|
||||
clone = FuncFixtureInfo2(self.argnames, copy.copy(self.initialnames))
|
||||
clone.name2fixturedefs = self.name2fixturedefs
|
||||
# maybe reassign ranks to tighten assigned ranks again and to empty reuse
|
||||
# shallow copy is enough for now
|
||||
clone._rank = copy.copy(self._rank)
|
||||
clone._reuse = copy.copy(self._reuse)
|
||||
# deepcopy these
|
||||
clone._outgoing = copy.deepcopy(self._outgoing)
|
||||
clone._outgoing = copy.deepcopy(self._outgoing)
|
||||
return clone
|
||||
|
||||
# The transitive closure of the fixture names that the item requires.
|
||||
# Note: can't include dynamic dependencies (`request.getfixturevalue` calls).
|
||||
@property
|
||||
def names_closure(self) -> List[str]:
|
||||
self._closure_store[:] = [
|
||||
name for name, _ in self.name_idx_closure(reverse=True)
|
||||
]
|
||||
return self._closure_store
|
||||
|
||||
def remove_fixture(self, node: Tuple[str, int]) -> None:
|
||||
# remove all edges from/to the node, remove the node and mark the rank
|
||||
# of the node for reuse
|
||||
for next in self._outgoing[node]:
|
||||
del self._incoming[next][node]
|
||||
for previous in self._incoming[node]:
|
||||
del self._outgoing[previous][node]
|
||||
del self._incoming[node]
|
||||
del self._outgoing[node]
|
||||
self._reuse.add(self._rank.pop(node))
|
||||
|
||||
def name_idx_closure(self, reverse: bool = False) -> Sequence[Tuple[str, int]]:
|
||||
# closure in topological sort order / scope is not honored
|
||||
return sorted(self._rank, key=self._rank.__getitem__)
|
||||
|
||||
def fixturedef_closure(self, reverse: bool = False) -> Sequence["FixtureDef[Any]"]:
|
||||
# closure in topogical sort order and proper scope order
|
||||
# unless the closure already was ordered by scope this is a different
|
||||
# topocoligal sort order!
|
||||
return sorted(
|
||||
(self.name2fixturedefs[name][idx] for name, idx in self.name_idx_closure()),
|
||||
key=lambda d: d._scope,
|
||||
)
|
||||
|
||||
def add_node(self, node: Tuple[str, int]):
|
||||
# triggers insertion if the node does not already exist
|
||||
self._rank[node]
|
||||
|
||||
def add_dependency(self, source: Tuple[str, int], dest: Tuple[str, int]) -> None:
|
||||
if source == dest:
|
||||
raise CyclicDependency
|
||||
|
||||
if dest in self._outgoing[source] or source in self._incoming[dest]:
|
||||
# if the invariant holds only one check is necessary since _incoming
|
||||
# is the adjunct to _outgoing and so they mirror each other.
|
||||
raise DuplicateDependency
|
||||
|
||||
# apprently nested default dicts behave a little funny.
|
||||
self._outgoing[source][dest] = None
|
||||
self._incoming[dest][source] = None
|
||||
|
||||
if self._rank[source] > self._rank[dest]:
|
||||
# access to the nodes triggers insertion of the nodes if they do not exist already.
|
||||
# these are kept whether the edge is inserted or not. Similarly, empty entries in
|
||||
# _outgoing and _incoming are kept. To remove these orphaned/unconnected entries call `prune`
|
||||
try:
|
||||
delta_f = [
|
||||
*self._depth_first_traversal(
|
||||
dest, self._outgoing, self._fwd_cond(source)
|
||||
)
|
||||
]
|
||||
except CyclicDependency:
|
||||
self.remove_dependency(source, dest)
|
||||
raise
|
||||
else:
|
||||
delta_b = [
|
||||
*self._depth_first_traversal(
|
||||
source, self._incoming, self._bwd_cond(dest)
|
||||
)
|
||||
]
|
||||
delta_f.sort(key=self._rank.__getitem__)
|
||||
delta_b.sort(key=self._rank.__getitem__)
|
||||
|
||||
ranks = sorted(
|
||||
self._rank[node] for node in itertools.chain(delta_f, delta_b)
|
||||
)
|
||||
|
||||
for node, rank in zip(itertools.chain(delta_b, delta_f), ranks):
|
||||
self._rank[node] = rank
|
||||
|
||||
def remove_dependency(self, source: Tuple[str, int], dest: Tuple[str, int]) -> None:
|
||||
del self._outgoing[source][dest]
|
||||
del self._incoming[dest][source]
|
||||
|
||||
def _fwd_cond(self, node: Tuple[str, int]) -> Callable[[Tuple[str, int]], bool]:
|
||||
bound = self._rank[node]
|
||||
|
||||
def predicate(next: Tuple[str, int]) -> bool:
|
||||
rank = self._rank[next]
|
||||
if rank == bound:
|
||||
raise CyclicDependency(node, next)
|
||||
return rank < bound
|
||||
|
||||
return predicate
|
||||
|
||||
def _bwd_cond(self, node: Tuple[str, int]) -> Callable[[Tuple[str, int]], bool]:
|
||||
bound = self._rank[node]
|
||||
|
||||
def predicate(next: Tuple[str, int]) -> bool:
|
||||
return self._rank[next] > bound
|
||||
|
||||
return predicate
|
||||
|
||||
def _prune(self, roots: Collection[Tuple[str, int]]):
|
||||
# remove all nodes not connected to any root
|
||||
while unconnected := {
|
||||
node
|
||||
for node, dests in self._incoming.items()
|
||||
if node not in roots and not dests
|
||||
}:
|
||||
for node in unconnected:
|
||||
self.remove_fixture(node)
|
||||
for node in {
|
||||
node
|
||||
for node in self._rank
|
||||
if node not in self._outgoing and node not in self._incoming
|
||||
}:
|
||||
self.remove_fixture(node)
|
||||
|
||||
def _compress(self):
|
||||
for i, k in enumerate(self.name_idx_closure()):
|
||||
self._rank[k] = i
|
||||
self._reuse.clear()
|
||||
|
||||
def prune(self):
|
||||
# usage index -1 mirrors current behaviour which has some bugs concerning autouse
|
||||
self._prune({(f, -1) for f in self.initialnames})
|
||||
|
||||
# keep the old interface aroud
|
||||
def prune_dependency_tree(self):
|
||||
self.prune()
|
||||
# update the names_closure store
|
||||
_ = self.names_closure
|
||||
|
||||
@staticmethod
|
||||
def _depth_first_traversal(
|
||||
start: Tuple[str, int],
|
||||
edges: Mapping[Tuple[str, int], Mapping[Tuple[str, int], None]],
|
||||
valid: Callable[[Tuple[str, int]], bool] = lambda _: True,
|
||||
) -> Iterator[Tuple[str, int]]:
|
||||
stack: list[Tuple[str, int]] = [start]
|
||||
visited: set[Tuple[str, int]] = set()
|
||||
|
||||
while stack:
|
||||
node = stack.pop()
|
||||
|
||||
if node in visited:
|
||||
continue
|
||||
visited.add(node)
|
||||
yield node
|
||||
|
||||
# to keep the proper order we need to iterate in reverse
|
||||
for next in reversed(edges[node].keys()):
|
||||
if valid(next) and next not in visited:
|
||||
stack.append(next)
|
||||
|
||||
|
||||
class FixtureRequest(abc.ABC):
|
||||
"""The type of the ``request`` fixture.
|
||||
|
||||
|
@ -1493,6 +1735,36 @@ class FixtureManager:
|
|||
|
||||
return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs)
|
||||
|
||||
def getfixtureinfo2(
|
||||
self,
|
||||
node: nodes.Item,
|
||||
func: Callable[..., object],
|
||||
cls: Optional[type],
|
||||
funcargs: bool = True,
|
||||
) -> FuncFixtureInfo2:
|
||||
"""See above"""
|
||||
if funcargs and not getattr(node, "nofuncargs", False):
|
||||
argnames = getfuncargnames(func, name=node.name, cls=cls)
|
||||
else:
|
||||
argnames = ()
|
||||
|
||||
usefixtures = tuple(
|
||||
arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args
|
||||
)
|
||||
autousefixtures = tuple(self._getautousenames(node.nodeid))
|
||||
# keep the previous order.
|
||||
# #in most cases this results in a suboptimal order later on
|
||||
initialnames = autousefixtures + usefixtures + argnames
|
||||
# unique initialnames >=py3.6 / optimize later
|
||||
initialnames = tuple(dict.fromkeys(initialnames))
|
||||
# fixturenames to skip because they would be replaced later on anyway
|
||||
# convert to set for faster lookup
|
||||
skipnames = set(_get_direct_parametrize_args(node))
|
||||
|
||||
info2 = FuncFixtureInfo2(argnames=argnames, initialnames=initialnames)
|
||||
self.buildfixtureclosure(info2, node.nodeid, skipnames)
|
||||
return info2
|
||||
|
||||
def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None:
|
||||
nodeid = None
|
||||
try:
|
||||
|
@ -1570,6 +1842,91 @@ class FixtureManager:
|
|||
fixturenames_closure.sort(key=sort_by_scope, reverse=True)
|
||||
return fixturenames_closure, arg2fixturedefs
|
||||
|
||||
def buildfixtureclosure(
|
||||
self,
|
||||
info: FuncFixtureInfo2,
|
||||
nodeid: str,
|
||||
ignore: Collection[str],
|
||||
) -> None:
|
||||
# TODO:
|
||||
# - reword last sentence
|
||||
# - rethink/get advice for error handling and reporting
|
||||
|
||||
# Walk the fixture definitions starting from the initalnames given by the
|
||||
# info object to build the dependency DAG (directed acyclic graph).
|
||||
# The reversed topological sort of the DAG is equilvalent to the previous
|
||||
# fixture_closure but is not ordered by scope. But, because the algorithm
|
||||
# enforces the invariant that a fixtures can only depend on fixtures of
|
||||
# the same or higher scope, sorting by scope produces another topological
|
||||
# sort.
|
||||
# Note that the insertion algortihm is not stable concerning the scope ordering
|
||||
cache: Dict[str, Sequence[FixtureDef[Any]]] = info.name2fixturedefs
|
||||
iter_stack: List[Iterator[str]] = [iter(info.initialnames)]
|
||||
parent_stack: List[Tuple[str, int]] = []
|
||||
|
||||
def retrieve(name: str) -> Sequence[FixtureDef[Any]] | None:
|
||||
try:
|
||||
return cache.get(name, None)
|
||||
except KeyError:
|
||||
defs = self.getfixturedefs(name, nodeid)
|
||||
if defs:
|
||||
cache[name] = defs
|
||||
return defs
|
||||
else:
|
||||
return None
|
||||
|
||||
while iter_stack:
|
||||
arg = next(iter_stack[-1], None)
|
||||
if arg:
|
||||
if arg in ignore:
|
||||
continue
|
||||
defs = retrieve(arg)
|
||||
if not defs:
|
||||
# for now continue if no definitions are found
|
||||
# this should be guarded against by giving appropriate ignores
|
||||
continue
|
||||
if parent_stack:
|
||||
parent = parent_stack[-1]
|
||||
# entry unconditionally exists at this point
|
||||
parent_scope = cast(Sequence[FixtureDef[Any]], retrieve(parent[0]))[
|
||||
parent[1]
|
||||
]._scope
|
||||
for i in range(-1, -len(defs) - 1, -1):
|
||||
try:
|
||||
info.add_dependency(parent, (arg, i))
|
||||
except CyclicDependency:
|
||||
continue
|
||||
except DuplicateDependency:
|
||||
break
|
||||
else:
|
||||
if defs[i]._scope < parent_scope:
|
||||
info.remove_dependency(parent, (arg, i))
|
||||
raise Exception("Scope violation: ...")
|
||||
# check if the fixture is already in the stack to bound
|
||||
# the stack growth.
|
||||
# this check is not a sufficient condition for termination
|
||||
# of the algorithm.
|
||||
if (arg, i) not in parent_stack:
|
||||
parent_stack.append((arg, i))
|
||||
iter_stack.append(iter(defs[i].argnames))
|
||||
break
|
||||
else:
|
||||
raise Exception("Cycle Detected: ...")
|
||||
else:
|
||||
parent_stack.append((arg, -1))
|
||||
iter_stack.append(iter(defs[-1].argnames))
|
||||
# make sure the initalnames are actually inserted even if they
|
||||
# have no dependencies
|
||||
info.add_node((arg, -1))
|
||||
else:
|
||||
# remove the exhausted iterator and backtrack
|
||||
iter_stack.pop()
|
||||
if parent_stack:
|
||||
parent_stack.pop()
|
||||
# remove potenially orphened nodes from failed insertions
|
||||
info.prune()
|
||||
info._compress()
|
||||
|
||||
def pytest_generate_tests(self, metafunc: "Metafunc") -> None:
|
||||
"""Generate new tests based on parametrized fixtures used by the given metafunc"""
|
||||
|
||||
|
|
Loading…
Reference in New Issue