This commit is contained in:
Janick Gerstenberger 2024-01-08 11:10:10 +08:00 committed by GitHub
commit 089d6f33b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 375 additions and 7 deletions

View File

@ -265,7 +265,7 @@ class DoctestItem(Item):
# Stuff needed for fixture support. # Stuff needed for fixture support.
self.obj = None self.obj = None
fm = self.session._fixturemanager fm = self.session._fixturemanager
fixtureinfo = fm.getfixtureinfo(node=self, func=None, cls=None) fixtureinfo = fm.getfixtureinfo2(node=self, func=None, cls=None)
self._fixtureinfo = fixtureinfo self._fixtureinfo = fixtureinfo
self.fixturenames = fixtureinfo.names_closure self.fixturenames = fixtureinfo.names_closure
self._initrequest() self._initrequest()

View File

@ -1,7 +1,9 @@
import abc import abc
import copy
import dataclasses import dataclasses
import functools import functools
import inspect import inspect
import itertools
import os import os
import warnings import warnings
from collections import defaultdict from collections import defaultdict
@ -12,6 +14,7 @@ from typing import AbstractSet
from typing import Any from typing import Any
from typing import Callable from typing import Callable
from typing import cast from typing import cast
from typing import Collection
from typing import Dict from typing import Dict
from typing import Final from typing import Final
from typing import final from typing import final
@ -20,7 +23,9 @@ from typing import Generic
from typing import Iterable from typing import Iterable
from typing import Iterator from typing import Iterator
from typing import List from typing import List
from typing import Mapping
from typing import MutableMapping from typing import MutableMapping
from typing import MutableSet
from typing import NoReturn from typing import NoReturn
from typing import Optional from typing import Optional
from typing import overload from typing import overload
@ -342,6 +347,248 @@ class FuncFixtureInfo:
self.names_closure[:] = sorted(closure, key=self.names_closure.index) self.names_closure[:] = sorted(closure, key=self.names_closure.index)
# this exceptions is mainly intended for internal signaling
class CyclicDependency(Exception):
pass
# this exceptions is mainly intended for internal signaling
class DuplicateDependency(Exception):
pass
# keep the interface (mostly) backwards compatible to FuncFixtureInfo.
# the topological sort/graph cycle detection algorithm is based on the paper
# "A dynamic topological sort algorithm for directed acyclic graphs" by
# David J. Pearce and Paul H. J. Kelly
# doi.org/10.1145/1187436.1210590
# whileydave.com/publications/pk07_jea
class FuncFixtureInfo2:
# TODO:
# - check which sort/sorted calls need to be reversed by default.
# - maybe replace Tuple[str, int] by a NamedTuple?
# - make this a frozen dataclass again? this would make clone more difficult to implement!
# - special case 'request' and nodes without fixture definitions in fixturedef_closure
__slots__ = (
"argnames",
"initialnames",
"name2fixturedefs",
"_closure_store",
"_incoming",
"_rank",
"_reuse",
"_outgoing",
)
# Fixture names that the item requests directly by function parameters.
argnames: Tuple[str, ...]
# Fixture names that the item immediately requires. These include
# argnames + fixture names specified via usefixtures and via autouse=True in
# fixture definitions.
initialnames: Tuple[str, ...]
# A map from a fixture name in the transitive closure to the FixtureDefs
# matching the name which are applicable to this function.
# There may be multiple overriding fixtures with the same name. The
# sequence is ordered from furthest to closes to the function.
name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]]
# topological rank / does not honor scope
_rank: MutableMapping[Tuple[str, int], int]
# reusable ranks. get populated by removing a graph node
_reuse: MutableSet[int]
# outgoing edges of the graph
_outgoing: MutableMapping[Tuple[str, int], MutableMapping[Tuple[str, int], None]]
# incoming edges of the grpah
_incoming: MutableMapping[Tuple[str, int], MutableMapping[Tuple[str, int], None]]
# backing store for name_closure. this is only needed because several
# objects just take references to it.
_closure_store: List[str]
def __init__(self, argnames: Tuple[str, ...], initialnames: Tuple[str, ...]):
self.argnames = argnames
self.initialnames = initialnames
# populate while building
self.name2fixturedefs = {}
self._rank = defaultdict(
lambda: self._reuse.pop() if self._reuse else len(self._rank)
)
self._outgoing = defaultdict(defaultdict)
self._incoming = defaultdict(defaultdict)
self._reuse = set()
self._closure_store = []
def clone(self) -> "FuncFixtureInfo2":
clone = FuncFixtureInfo2(self.argnames, copy.copy(self.initialnames))
clone.name2fixturedefs = self.name2fixturedefs
# maybe reassign ranks to tighten assigned ranks again and to empty reuse
# shallow copy is enough for now
clone._rank = copy.copy(self._rank)
clone._reuse = copy.copy(self._reuse)
# deepcopy these
clone._outgoing = copy.deepcopy(self._outgoing)
clone._outgoing = copy.deepcopy(self._outgoing)
return clone
# The transitive closure of the fixture names that the item requires.
# Note: can't include dynamic dependencies (`request.getfixturevalue` calls).
@property
def names_closure(self) -> List[str]:
self._closure_store[:] = [
name for name, _ in self.name_idx_closure(reverse=True)
]
return self._closure_store
def remove_fixture(self, node: Tuple[str, int]) -> None:
# remove all edges from/to the node, remove the node and mark the rank
# of the node for reuse
for next in self._outgoing[node]:
del self._incoming[next][node]
for previous in self._incoming[node]:
del self._outgoing[previous][node]
del self._incoming[node]
del self._outgoing[node]
self._reuse.add(self._rank.pop(node))
def name_idx_closure(self, reverse: bool = False) -> Sequence[Tuple[str, int]]:
# closure in topological sort order / scope is not honored
return sorted(self._rank, key=self._rank.__getitem__, reverse=reverse)
def fixturedef_closure(self, reverse: bool = False) -> Sequence["FixtureDef[Any]"]:
# closure in topogical sort order and proper scope order
# unless the closure already was ordered by scope this is a different
# topocoligal sort order!
return sorted(
(self.name2fixturedefs[name][idx] for name, idx in self.name_idx_closure()),
key=lambda d: d._scope,
reverse=reverse,
)
def add_node(self, node: Tuple[str, int]):
# triggers insertion if the node does not already exist
self._rank[node]
def add_dependency(self, source: Tuple[str, int], dest: Tuple[str, int]) -> None:
if source == dest:
raise CyclicDependency
if dest in self._outgoing[source] or source in self._incoming[dest]:
# if the invariant holds only one check is necessary since _incoming
# is the adjunct to _outgoing and so they mirror each other.
raise DuplicateDependency
# apprently nested default dicts behave a little funny.
self._outgoing[source][dest] = None
self._incoming[dest][source] = None
if self._rank[source] > self._rank[dest]:
# access to the nodes triggers insertion of the nodes if they do not exist already.
# these are kept whether the edge is inserted or not. Similarly, empty entries in
# _outgoing and _incoming are kept. To remove these orphaned/unconnected entries call `prune`
try:
delta_f = [
*self._depth_first_traversal(
dest, self._outgoing, self._fwd_cond(source)
)
]
except CyclicDependency:
self.remove_dependency(source, dest)
raise
else:
delta_b = [
*self._depth_first_traversal(
source, self._incoming, self._bwd_cond(dest)
)
]
delta_f.sort(key=self._rank.__getitem__)
delta_b.sort(key=self._rank.__getitem__)
ranks = sorted(
self._rank[node] for node in itertools.chain(delta_f, delta_b)
)
for node, rank in zip(itertools.chain(delta_b, delta_f), ranks):
self._rank[node] = rank
def remove_dependency(self, source: Tuple[str, int], dest: Tuple[str, int]) -> None:
del self._outgoing[source][dest]
del self._incoming[dest][source]
def _fwd_cond(self, node: Tuple[str, int]) -> Callable[[Tuple[str, int]], bool]:
bound = self._rank[node]
def predicate(next: Tuple[str, int]) -> bool:
rank = self._rank[next]
if rank == bound:
raise CyclicDependency(node, next)
return rank < bound
return predicate
def _bwd_cond(self, node: Tuple[str, int]) -> Callable[[Tuple[str, int]], bool]:
bound = self._rank[node]
def predicate(next: Tuple[str, int]) -> bool:
return self._rank[next] > bound
return predicate
def _prune(self, roots: Collection[Tuple[str, int]]):
# remove all nodes not connected to any root
while unconnected := {
node
for node, dests in self._incoming.items()
if node not in roots and not dests
}:
for node in unconnected:
self.remove_fixture(node)
for node in {
node
for node in self._rank
if node not in self._outgoing
and node not in self._incoming
and node not in roots
}:
self.remove_fixture(node)
def _compress(self):
for i, k in enumerate(self.name_idx_closure()):
self._rank[k] = i
self._reuse.clear()
def prune(self):
# usage index -1 mirrors current behaviour which has some bugs concerning autouse
self._prune({(f, -1) for f in self.initialnames})
# keep the old interface aroud
def prune_dependency_tree(self):
self.prune()
# update the names_closure store
_ = self.names_closure
@staticmethod
def _depth_first_traversal(
start: Tuple[str, int],
edges: Mapping[Tuple[str, int], Mapping[Tuple[str, int], None]],
valid: Callable[[Tuple[str, int]], bool] = lambda _: True,
) -> Iterator[Tuple[str, int]]:
stack: list[Tuple[str, int]] = [start]
visited: set[Tuple[str, int]] = set()
while stack:
node = stack.pop()
if node in visited:
continue
visited.add(node)
yield node
# to keep the proper order we need to iterate in reverse
for next in reversed(edges[node].keys()):
if valid(next) and next not in visited:
stack.append(next)
class FixtureRequest(abc.ABC): class FixtureRequest(abc.ABC):
"""The type of the ``request`` fixture. """The type of the ``request`` fixture.
@ -1495,6 +1742,35 @@ class FixtureManager:
return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs) return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs)
def getfixtureinfo2(
self,
node: nodes.Item,
func: Optional[Callable[..., object]],
cls: Optional[type],
) -> FuncFixtureInfo2:
"""See above"""
if func is not None and not getattr(node, "nofuncargs", False):
argnames = getfuncargnames(func, name=node.name, cls=cls)
else:
argnames = ()
usefixtures = tuple(
arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args
)
autousefixtures = tuple(self._getautousenames(node.nodeid))
# keep the previous order.
# #in most cases this results in a suboptimal order later on
initialnames = autousefixtures + usefixtures + argnames
# unique initialnames >=py3.6 / optimize later
initialnames = tuple(dict.fromkeys(initialnames))
# fixturenames to skip because they would be replaced later on anyway
# convert to set for faster lookup
skipnames = set(_get_direct_parametrize_args(node))
info2 = FuncFixtureInfo2(argnames=argnames, initialnames=initialnames)
self.buildfixtureclosure(info2, node.nodeid, skipnames)
return info2
def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None: def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None:
nodeid = None nodeid = None
try: try:
@ -1572,6 +1848,98 @@ class FixtureManager:
fixturenames_closure.sort(key=sort_by_scope, reverse=True) fixturenames_closure.sort(key=sort_by_scope, reverse=True)
return fixturenames_closure, arg2fixturedefs return fixturenames_closure, arg2fixturedefs
def buildfixtureclosure(
self,
info: FuncFixtureInfo2,
nodeid: str,
ignore_args: Collection[str],
) -> None:
# TODO:
# - reword last sentence
# - rethink/get advice for error handling and reporting
# Walk the fixture definitions starting from the initalnames given by the
# info object to build the dependency DAG (directed acyclic graph).
# The reversed topological sort of the DAG is equilvalent to the previous
# fixture_closure but is not ordered by scope. But, because the algorithm
# enforces the invariant that a fixtures can only depend on fixtures of
# the same or higher scope, sorting by scope produces another topological
# sort.
# Note that the insertion algortihm is not stable concerning the scope ordering
cache: Dict[str, Sequence[FixtureDef[Any]]] = info.name2fixturedefs
iter_stack: List[Iterator[str]] = [iter(info.initialnames)]
parent_stack: List[Tuple[str, int]] = []
def retrieve(name: str) -> Optional[Sequence[FixtureDef[Any]]]:
try:
return cache[name]
except KeyError:
defs = self.getfixturedefs(name, nodeid)
if defs:
cache[name] = defs
return defs
else:
return None
def push_onto_stack(node: Tuple[str, int], argnames: Sequence[str]):
if node not in parent_stack:
parent_stack.append(node)
iter_stack.append(iter(argnames))
while iter_stack:
arg = next(iter_stack[-1], None)
if arg:
if arg == "request" or arg in ignore_args:
# do not retrive definitions because they either to not exist
# or are replaced during parametrization.
if parent_stack:
with suppress(DuplicateDependency):
info.add_dependency(parent_stack[-1], (arg, -1))
else:
info.add_node((arg, -1))
else:
defs = retrieve(arg)
if not defs:
# for now just add a edge/node instead of reporting an error
if parent_stack:
with suppress(DuplicateDependency):
info.add_dependency(parent_stack[-1], (arg, -1))
else:
info.add_node((arg, -1))
continue
if parent_stack:
parent = parent_stack[-1]
# entry unconditionally exists at this point
parent_scope = cast(
Sequence[FixtureDef[Any]], retrieve(parent[0])
)[parent[1]]._scope
for i in range(-1, -len(defs) - 1, -1):
try:
info.add_dependency(parent, (arg, i))
except CyclicDependency:
continue
except DuplicateDependency:
break
else:
if defs[i]._scope < parent_scope:
info.remove_dependency(parent, (arg, i))
raise Exception("Scope violation: ...")
push_onto_stack((arg, i), defs[i].argnames)
break
else:
raise Exception("Cycle Detected: ...")
else:
push_onto_stack((arg, -1), defs[-1].argnames)
info.add_node((arg, -1))
else:
# remove the exhausted iterator and backtrack
iter_stack.pop()
if parent_stack:
parent_stack.pop()
# remove potenially orphened nodes from failed insertions
info.prune()
info._compress()
def pytest_generate_tests(self, metafunc: "Metafunc") -> None: def pytest_generate_tests(self, metafunc: "Metafunc") -> None:
"""Generate new tests based on parametrized fixtures used by the given metafunc""" """Generate new tests based on parametrized fixtures used by the given metafunc"""

View File

@ -58,7 +58,7 @@ from _pytest.config.argparsing import Parser
from _pytest.deprecated import check_ispytest from _pytest.deprecated import check_ispytest
from _pytest.fixtures import FixtureDef from _pytest.fixtures import FixtureDef
from _pytest.fixtures import FixtureRequest from _pytest.fixtures import FixtureRequest
from _pytest.fixtures import FuncFixtureInfo from _pytest.fixtures import FuncFixtureInfo2
from _pytest.fixtures import get_scope_node from _pytest.fixtures import get_scope_node
from _pytest.main import Session from _pytest.main import Session
from _pytest.mark import MARK_GEN from _pytest.mark import MARK_GEN
@ -1131,7 +1131,7 @@ class Metafunc:
def __init__( def __init__(
self, self,
definition: "FunctionDefinition", definition: "FunctionDefinition",
fixtureinfo: fixtures.FuncFixtureInfo, fixtureinfo: fixtures.FuncFixtureInfo2,
config: Config, config: Config,
cls=None, cls=None,
module=None, module=None,
@ -1580,7 +1580,7 @@ def _show_fixtures_per_test(config: Config, session: Session) -> None:
def write_item(item: nodes.Item) -> None: def write_item(item: nodes.Item) -> None:
# Not all items have _fixtureinfo attribute. # Not all items have _fixtureinfo attribute.
info: Optional[FuncFixtureInfo] = getattr(item, "_fixtureinfo", None) info: Optional[FuncFixtureInfo2] = getattr(item, "_fixtureinfo", None)
if info is None or not info.name2fixturedefs: if info is None or not info.name2fixturedefs:
# This test item does not use any fixtures. # This test item does not use any fixtures.
return return
@ -1707,7 +1707,7 @@ class Function(PyobjMixin, nodes.Item):
callobj=NOTSET, callobj=NOTSET,
keywords: Optional[Mapping[str, Any]] = None, keywords: Optional[Mapping[str, Any]] = None,
session: Optional[Session] = None, session: Optional[Session] = None,
fixtureinfo: Optional[FuncFixtureInfo] = None, fixtureinfo: Optional[FuncFixtureInfo2] = None,
originalname: Optional[str] = None, originalname: Optional[str] = None,
) -> None: ) -> None:
super().__init__(name, parent, config=config, session=session) super().__init__(name, parent, config=config, session=session)
@ -1743,8 +1743,8 @@ class Function(PyobjMixin, nodes.Item):
if fixtureinfo is None: if fixtureinfo is None:
fm = self.session._fixturemanager fm = self.session._fixturemanager
fixtureinfo = fm.getfixtureinfo(self, self.obj, self.cls) fixtureinfo = fm.getfixtureinfo2(self, self.obj, self.cls)
self._fixtureinfo: FuncFixtureInfo = fixtureinfo self._fixtureinfo: FuncFixtureInfo2 = fixtureinfo
self.fixturenames = fixtureinfo.names_closure self.fixturenames = fixtureinfo.names_closure
self._initrequest() self._initrequest()