diff --git a/src/_pytest/fixtures.py b/src/_pytest/fixtures.py index d56274629..68dec7112 100644 --- a/src/_pytest/fixtures.py +++ b/src/_pytest/fixtures.py @@ -1,7 +1,9 @@ import abc +import copy import dataclasses import functools import inspect +import itertools import os import warnings from collections import defaultdict @@ -12,6 +14,7 @@ from typing import AbstractSet from typing import Any from typing import Callable from typing import cast +from typing import Collection from typing import Dict from typing import Final from typing import final @@ -20,7 +23,9 @@ from typing import Generic from typing import Iterable from typing import Iterator from typing import List +from typing import Mapping from typing import MutableMapping +from typing import MutableSet from typing import NoReturn from typing import Optional from typing import overload @@ -340,6 +345,243 @@ class FuncFixtureInfo: self.names_closure[:] = sorted(closure, key=self.names_closure.index) +# this exceptions is mainly intended for internal signaling +class CyclicDependency(Exception): + pass + + +# this exceptions is mainly intended for internal signaling +class DuplicateDependency(Exception): + pass + + +# keep the interface (mostly) backwards compatible to FuncFixtureInfo. +# the topological sort/graph cycle detection algorithm is based on the paper +# "A dynamic topological sort algorithm for directed acyclic graphs" by +# David J. Pearce and Paul H. J. Kelly +# doi.org/10.1145/1187436.1210590 +# whileydave.com/publications/pk07_jea +class FuncFixtureInfo2: + # TODO: + # - check which sort/sorted calls need to be reversed by default. + # - maybe replace Tuple[str, int] by a NamedTuple? + # - make this a frozen dataclass again? this would make clone more difficult to implement! + __slots__ = ( + "argnames", + "initialnames", + "name2fixturedefs", + "_closure_store", + "_incoming", + "_rank", + "_reuse", + "_outgoing", + ) + + # Fixture names that the item requests directly by function parameters. + argnames: Tuple[str, ...] + # Fixture names that the item immediately requires. These include + # argnames + fixture names specified via usefixtures and via autouse=True in + # fixture definitions. + initialnames: Tuple[str, ...] + # A map from a fixture name in the transitive closure to the FixtureDefs + # matching the name which are applicable to this function. + # There may be multiple overriding fixtures with the same name. The + # sequence is ordered from furthest to closes to the function. + name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]] + + # topological rank / does not honor scope + _rank: MutableMapping[Tuple[str, int], int] + # reusable ranks. get populated by removing a graph node + _reuse: MutableSet[int] + # outgoing edges of the graph + _outgoing: MutableMapping[Tuple[str, int], MutableMapping[Tuple[str, int], None]] + # incoming edges of the grpah + _incoming: MutableMapping[Tuple[str, int], MutableMapping[Tuple[str, int], None]] + # backing store for name_closure. this is only needed because several + # objects just take references to it. + _closure_store: List[str] + + def __init__(self, argnames: Tuple[str, ...], initialnames: Tuple[str, ...]): + self.argnames = argnames + self.initialnames = initialnames + # populate while building + self.name2fixturedefs = {} + + self._rank = defaultdict( + lambda: self._reuse.pop() if self._reuse else len(self._rank) + ) + self._outgoing = defaultdict(defaultdict) + self._incoming = defaultdict(defaultdict) + self._reuse = set() + + def clone(self) -> "FuncFixtureInfo2": + clone = FuncFixtureInfo2(self.argnames, copy.copy(self.initialnames)) + clone.name2fixturedefs = self.name2fixturedefs + # maybe reassign ranks to tighten assigned ranks again and to empty reuse + # shallow copy is enough for now + clone._rank = copy.copy(self._rank) + clone._reuse = copy.copy(self._reuse) + # deepcopy these + clone._outgoing = copy.deepcopy(self._outgoing) + clone._outgoing = copy.deepcopy(self._outgoing) + return clone + + # The transitive closure of the fixture names that the item requires. + # Note: can't include dynamic dependencies (`request.getfixturevalue` calls). + @property + def names_closure(self) -> List[str]: + self._closure_store[:] = [ + name for name, _ in self.name_idx_closure(reverse=True) + ] + return self._closure_store + + def remove_fixture(self, node: Tuple[str, int]) -> None: + # remove all edges from/to the node, remove the node and mark the rank + # of the node for reuse + for next in self._outgoing[node]: + del self._incoming[next][node] + for previous in self._incoming[node]: + del self._outgoing[previous][node] + del self._incoming[node] + del self._outgoing[node] + self._reuse.add(self._rank.pop(node)) + + def name_idx_closure(self, reverse: bool = False) -> Sequence[Tuple[str, int]]: + # closure in topological sort order / scope is not honored + return sorted(self._rank, key=self._rank.__getitem__) + + def fixturedef_closure(self, reverse: bool = False) -> Sequence["FixtureDef[Any]"]: + # closure in topogical sort order and proper scope order + # unless the closure already was ordered by scope this is a different + # topocoligal sort order! + return sorted( + (self.name2fixturedefs[name][idx] for name, idx in self.name_idx_closure()), + key=lambda d: d._scope, + ) + + def add_node(self, node: Tuple[str, int]): + # triggers insertion if the node does not already exist + self._rank[node] + + def add_dependency(self, source: Tuple[str, int], dest: Tuple[str, int]) -> None: + if source == dest: + raise CyclicDependency + + if dest in self._outgoing[source] or source in self._incoming[dest]: + # if the invariant holds only one check is necessary since _incoming + # is the adjunct to _outgoing and so they mirror each other. + raise DuplicateDependency + + # apprently nested default dicts behave a little funny. + self._outgoing[source][dest] = None + self._incoming[dest][source] = None + + if self._rank[source] > self._rank[dest]: + # access to the nodes triggers insertion of the nodes if they do not exist already. + # these are kept whether the edge is inserted or not. Similarly, empty entries in + # _outgoing and _incoming are kept. To remove these orphaned/unconnected entries call `prune` + try: + delta_f = [ + *self._depth_first_traversal( + dest, self._outgoing, self._fwd_cond(source) + ) + ] + except CyclicDependency: + self.remove_dependency(source, dest) + raise + else: + delta_b = [ + *self._depth_first_traversal( + source, self._incoming, self._bwd_cond(dest) + ) + ] + delta_f.sort(key=self._rank.__getitem__) + delta_b.sort(key=self._rank.__getitem__) + + ranks = sorted( + self._rank[node] for node in itertools.chain(delta_f, delta_b) + ) + + for node, rank in zip(itertools.chain(delta_b, delta_f), ranks): + self._rank[node] = rank + + def remove_dependency(self, source: Tuple[str, int], dest: Tuple[str, int]) -> None: + del self._outgoing[source][dest] + del self._incoming[dest][source] + + def _fwd_cond(self, node: Tuple[str, int]) -> Callable[[Tuple[str, int]], bool]: + bound = self._rank[node] + + def predicate(next: Tuple[str, int]) -> bool: + rank = self._rank[next] + if rank == bound: + raise CyclicDependency(node, next) + return rank < bound + + return predicate + + def _bwd_cond(self, node: Tuple[str, int]) -> Callable[[Tuple[str, int]], bool]: + bound = self._rank[node] + + def predicate(next: Tuple[str, int]) -> bool: + return self._rank[next] > bound + + return predicate + + def _prune(self, roots: Collection[Tuple[str, int]]): + # remove all nodes not connected to any root + while unconnected := { + node + for node, dests in self._incoming.items() + if node not in roots and not dests + }: + for node in unconnected: + self.remove_fixture(node) + for node in { + node + for node in self._rank + if node not in self._outgoing and node not in self._incoming + }: + self.remove_fixture(node) + + def _compress(self): + for i, k in enumerate(self.name_idx_closure()): + self._rank[k] = i + self._reuse.clear() + + def prune(self): + # usage index -1 mirrors current behaviour which has some bugs concerning autouse + self._prune({(f, -1) for f in self.initialnames}) + + # keep the old interface aroud + def prune_dependency_tree(self): + self.prune() + # update the names_closure store + _ = self.names_closure + + @staticmethod + def _depth_first_traversal( + start: Tuple[str, int], + edges: Mapping[Tuple[str, int], Mapping[Tuple[str, int], None]], + valid: Callable[[Tuple[str, int]], bool] = lambda _: True, + ) -> Iterator[Tuple[str, int]]: + stack: list[Tuple[str, int]] = [start] + visited: set[Tuple[str, int]] = set() + + while stack: + node = stack.pop() + + if node in visited: + continue + visited.add(node) + yield node + + # to keep the proper order we need to iterate in reverse + for next in reversed(edges[node].keys()): + if valid(next) and next not in visited: + stack.append(next) + + class FixtureRequest(abc.ABC): """The type of the ``request`` fixture. @@ -1493,6 +1735,36 @@ class FixtureManager: return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs) + def getfixtureinfo2( + self, + node: nodes.Item, + func: Callable[..., object], + cls: Optional[type], + funcargs: bool = True, + ) -> FuncFixtureInfo2: + """See above""" + if funcargs and not getattr(node, "nofuncargs", False): + argnames = getfuncargnames(func, name=node.name, cls=cls) + else: + argnames = () + + usefixtures = tuple( + arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args + ) + autousefixtures = tuple(self._getautousenames(node.nodeid)) + # keep the previous order. + # #in most cases this results in a suboptimal order later on + initialnames = autousefixtures + usefixtures + argnames + # unique initialnames >=py3.6 / optimize later + initialnames = tuple(dict.fromkeys(initialnames)) + # fixturenames to skip because they would be replaced later on anyway + # convert to set for faster lookup + skipnames = set(_get_direct_parametrize_args(node)) + + info2 = FuncFixtureInfo2(argnames=argnames, initialnames=initialnames) + self.buildfixtureclosure(info2, node.nodeid, skipnames) + return info2 + def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None: nodeid = None try: @@ -1570,6 +1842,91 @@ class FixtureManager: fixturenames_closure.sort(key=sort_by_scope, reverse=True) return fixturenames_closure, arg2fixturedefs + def buildfixtureclosure( + self, + info: FuncFixtureInfo2, + nodeid: str, + ignore: Collection[str], + ) -> None: + # TODO: + # - reword last sentence + # - rethink/get advice for error handling and reporting + + # Walk the fixture definitions starting from the initalnames given by the + # info object to build the dependency DAG (directed acyclic graph). + # The reversed topological sort of the DAG is equilvalent to the previous + # fixture_closure but is not ordered by scope. But, because the algorithm + # enforces the invariant that a fixtures can only depend on fixtures of + # the same or higher scope, sorting by scope produces another topological + # sort. + # Note that the insertion algortihm is not stable concerning the scope ordering + cache: Dict[str, Sequence[FixtureDef[Any]]] = info.name2fixturedefs + iter_stack: List[Iterator[str]] = [iter(info.initialnames)] + parent_stack: List[Tuple[str, int]] = [] + + def retrieve(name: str) -> Sequence[FixtureDef[Any]] | None: + try: + return cache.get(name, None) + except KeyError: + defs = self.getfixturedefs(name, nodeid) + if defs: + cache[name] = defs + return defs + else: + return None + + while iter_stack: + arg = next(iter_stack[-1], None) + if arg: + if arg in ignore: + continue + defs = retrieve(arg) + if not defs: + # for now continue if no definitions are found + # this should be guarded against by giving appropriate ignores + continue + if parent_stack: + parent = parent_stack[-1] + # entry unconditionally exists at this point + parent_scope = cast(Sequence[FixtureDef[Any]], retrieve(parent[0]))[ + parent[1] + ]._scope + for i in range(-1, -len(defs) - 1, -1): + try: + info.add_dependency(parent, (arg, i)) + except CyclicDependency: + continue + except DuplicateDependency: + break + else: + if defs[i]._scope < parent_scope: + info.remove_dependency(parent, (arg, i)) + raise Exception("Scope violation: ...") + # check if the fixture is already in the stack to bound + # the stack growth. + # this check is not a sufficient condition for termination + # of the algorithm. + if (arg, i) not in parent_stack: + parent_stack.append((arg, i)) + iter_stack.append(iter(defs[i].argnames)) + break + else: + raise Exception("Cycle Detected: ...") + else: + parent_stack.append((arg, -1)) + iter_stack.append(iter(defs[-1].argnames)) + # make sure the initalnames are actually inserted even if they + # have no dependencies + info.add_node((arg, -1)) + else: + # remove the exhausted iterator and backtrack + iter_stack.pop() + if parent_stack: + parent_stack.pop() + # remove potenially orphened nodes from failed insertions + info.prune() + info._compress() + def pytest_generate_tests(self, metafunc: "Metafunc") -> None: """Generate new tests based on parametrized fixtures used by the given metafunc"""