Slight change in early teardown solution

Fix some bugs and  do some improvements and refactors
This commit is contained in:
Sadra Barikbin 2023-06-29 10:51:27 +03:30
parent 6fc6b153fd
commit 5749ea2f46
4 changed files with 900 additions and 155 deletions

View File

@ -225,13 +225,19 @@ class FixtureArgKey:
param_value: Optional[Hashable] param_value: Optional[Hashable]
scoped_item_path: Optional[Path] scoped_item_path: Optional[Path]
item_cls: Optional[type] item_cls: Optional[type]
baseid: Optional[str]
def get_fixture_arg_key(item: nodes.Item, argname: str, scope: Scope) -> FixtureArgKey: def get_fixture_arg_key(
item: nodes.Item,
argname: str,
scope: Scope,
baseid: Optional[str] = None,
is_parametrized: Optional[bool] = False,
) -> FixtureArgKey:
param_index = None param_index = None
param_value = None param_value = None
if hasattr(item, "callspec") and argname in item.callspec.params: if is_parametrized:
# Fixture is parametrized.
if isinstance(item.callspec.params[argname], Hashable): if isinstance(item.callspec.params[argname], Hashable):
param_value = item.callspec.params[argname] param_value = item.callspec.params[argname]
else: else:
@ -251,7 +257,9 @@ def get_fixture_arg_key(item: nodes.Item, argname: str, scope: Scope) -> Fixture
else: else:
item_cls = None item_cls = None
return FixtureArgKey(argname, param_index, param_value, scoped_item_path, item_cls) return FixtureArgKey(
argname, param_index, param_value, scoped_item_path, item_cls, baseid
)
def get_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[FixtureArgKey]: def get_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[FixtureArgKey]:
@ -259,17 +267,27 @@ def get_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[FixtureArgKey]:
the specified scope.""" the specified scope."""
assert scope is not Scope.Function assert scope is not Scope.Function
if hasattr(item, "_fixtureinfo"): if hasattr(item, "_fixtureinfo"):
# sort this so that different calls to for argname in item._fixtureinfo.names_closure:
# get_fixture_keys will be deterministic. is_parametrized = (
for argname, fixture_def in sorted(item._fixtureinfo.name2fixturedefs.items()): hasattr(item, "callspec") and argname in item.callspec._arg2scope
# In the case item is parametrized on the `argname` with )
# a scope, it overrides that of the fixture. for i in reversed(
if hasattr(item, "callspec") and argname in item.callspec._arg2scope: range(item._fixtureinfo.name2num_fixturedefs_used[argname])
if item.callspec._arg2scope[argname] != scope: ):
continue fixturedef = item._fixtureinfo.name2fixturedefs[argname][-(i + 1)]
elif fixture_def[-1]._scope != scope: # In the case item is parametrized on the `argname` with
continue # a scope, it overrides that of the fixture.
yield get_fixture_arg_key(item, argname, scope) if (is_parametrized and item.callspec._arg2scope[argname] != scope) or (
not is_parametrized and fixturedef._scope != scope
):
break
yield get_fixture_arg_key(
item,
argname,
scope,
None if fixturedef.is_pseudo else fixturedef.baseid,
is_parametrized,
)
# Algorithm for sorting on a per-parametrized resource setup basis. # Algorithm for sorting on a per-parametrized resource setup basis.
@ -293,17 +311,31 @@ def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
for key in keys: for key in keys:
item_d[key].append(item) item_d[key].append(item)
items_dict = dict.fromkeys(items, None) items_dict = dict.fromkeys(items, None)
last_item_by_argkey: Dict[FixtureArgKey, nodes.Item] = {}
reordered_items = list( reordered_items = list(
reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session) reorder_items_atscope(
items_dict,
argkeys_cache,
items_by_argkey,
last_item_by_argkey,
Scope.Session,
)
) )
for scope in reversed(HIGH_SCOPES): for scope in reversed(HIGH_SCOPES):
for key in items_by_argkey[scope]: for key in items_by_argkey[scope]:
last_item_dependent_on_key = items_by_argkey[scope][key].pop() last_item_dependent_on_key = last_item_by_argkey[key]
fixturedef = last_item_dependent_on_key._fixtureinfo.name2fixturedefs[ if key.baseid is None:
key.argname
][-1]
if fixturedef.is_pseudo:
continue continue
for i in range(
last_item_dependent_on_key._fixtureinfo.name2num_fixturedefs_used[
key.argname
]
):
fixturedef = last_item_dependent_on_key._fixtureinfo.name2fixturedefs[
key.argname
][-(i + 1)]
if fixturedef.baseid == key.baseid:
break
last_item_dependent_on_key.teardown = functools.partial( last_item_dependent_on_key.teardown = functools.partial(
lambda other_finalizers, new_finalizer: [ lambda other_finalizers, new_finalizer: [
finalizer() for finalizer in (new_finalizer, other_finalizers) finalizer() for finalizer in (new_finalizer, other_finalizers)
@ -330,20 +362,28 @@ def fix_cache_order(
if key in ignore: if key in ignore:
continue continue
items_by_argkey[scope][key].appendleft(item) items_by_argkey[scope][key].appendleft(item)
# Make sure last dependent item on a key
# remains updated while reordering.
if items_by_argkey[scope][key][-1] == item:
items_by_argkey[scope][key].pop()
def reorder_items_atscope( def reorder_items_atscope(
items: Dict[nodes.Item, None], items: Dict[nodes.Item, None],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]], argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]],
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]], items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]],
last_item_by_argkey: Dict[FixtureArgKey, nodes.Item],
scope: Scope, scope: Scope,
) -> Dict[nodes.Item, None]: ) -> Dict[nodes.Item, None]:
if scope is Scope.Function or len(items) < 3: if scope is Scope.Function:
return items return items
elif len(items) < 3:
for item in items:
for key in argkeys_cache[scope].get(item, []):
last_item_by_argkey[key] = item
return reorder_items_atscope(
items,
argkeys_cache,
items_by_argkey,
last_item_by_argkey,
scope.next_lower(),
)
ignore: Set[Optional[FixtureArgKey]] = set() ignore: Set[Optional[FixtureArgKey]] = set()
items_deque = deque(items) items_deque = deque(items)
items_done: Dict[nodes.Item, None] = {} items_done: Dict[nodes.Item, None] = {}
@ -363,20 +403,26 @@ def reorder_items_atscope(
no_argkey_group[item] = None no_argkey_group[item] = None
else: else:
slicing_argkey, _ = argkeys.popitem() slicing_argkey, _ = argkeys.popitem()
# We don't have to remove relevant items from later in the
# deque because they'll just be ignored. # deque because they'll just be ignored.
matching_items = [ for i in reversed(
i for i in scoped_items_by_argkey[slicing_argkey] if i in items dict.fromkeys(scoped_items_by_argkey[slicing_argkey])
] ):
for i in reversed(matching_items): if i not in items:
continue
fix_cache_order(i, argkeys_cache, items_by_argkey, ignore, scope) fix_cache_order(i, argkeys_cache, items_by_argkey, ignore, scope)
items_deque.appendleft(i) items_deque.appendleft(i)
break break
if no_argkey_group: if no_argkey_group:
no_argkey_group = reorder_items_atscope( no_argkey_group = reorder_items_atscope(
no_argkey_group, argkeys_cache, items_by_argkey, scope.next_lower() no_argkey_group,
argkeys_cache,
items_by_argkey,
last_item_by_argkey,
scope.next_lower(),
) )
for item in no_argkey_group: for item in no_argkey_group:
for key in scoped_argkeys_cache.get(item, []):
last_item_by_argkey[key] = item
items_done[item] = None items_done[item] = None
ignore.add(slicing_argkey) ignore.add(slicing_argkey)
return items_done return items_done
@ -384,7 +430,13 @@ def reorder_items_atscope(
@dataclasses.dataclass @dataclasses.dataclass
class FuncFixtureInfo: class FuncFixtureInfo:
__slots__ = ("argnames", "initialnames", "names_closure", "name2fixturedefs") __slots__ = (
"argnames",
"initialnames",
"names_closure",
"name2fixturedefs",
"name2num_fixturedefs_used",
)
# Original function argument names. # Original function argument names.
argnames: Tuple[str, ...] argnames: Tuple[str, ...]
@ -394,6 +446,7 @@ class FuncFixtureInfo:
initialnames: Tuple[str, ...] initialnames: Tuple[str, ...]
names_closure: List[str] names_closure: List[str]
name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]] name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]]
name2num_fixturedefs_used: Dict[str, int]
def prune_dependency_tree(self) -> None: def prune_dependency_tree(self) -> None:
"""Recompute names_closure from initialnames and name2fixturedefs. """Recompute names_closure from initialnames and name2fixturedefs.
@ -401,26 +454,11 @@ class FuncFixtureInfo:
Can only reduce names_closure, which means that the new closure will Can only reduce names_closure, which means that the new closure will
always be a subset of the old one. The order is preserved. always be a subset of the old one. The order is preserved.
This method is needed because direct parametrization may shadow some This method is needed because dynamic direct parametrization may shadow
of the fixtures that were included in the originally built dependency some of the fixtures that were included in the originally built dependency
tree. In this way the dependency tree can get pruned, and the closure tree. In this way the dependency tree can get pruned, and the closure
of argnames may get reduced. of argnames may get reduced.
""" """
closure: Set[str] = set()
working_set = set(self.initialnames)
while working_set:
argname = working_set.pop()
# Argname may be smth not included in the original names_closure,
# in which case we ignore it. This currently happens with pseudo
# FixtureDefs which wrap 'get_direct_param_fixture_func(request)'.
# So they introduce the new dependency 'request' which might have
# been missing in the original tree (closure).
if argname not in closure and argname in self.names_closure:
closure.add(argname)
if argname in self.name2fixturedefs:
working_set.update(self.name2fixturedefs[argname][-1].argnames)
self.names_closure[:] = sorted(closure, key=self.names_closure.index)
class FixtureRequest: class FixtureRequest:
@ -1407,6 +1445,26 @@ def pytest_addoption(parser: Parser) -> None:
) )
def _get_direct_parametrize_args(node: nodes.Node) -> List[str]:
"""Return all direct parametrization arguments of a node, so we don't
mistake them for fixtures.
Check https://github.com/pytest-dev/pytest/issues/5036.
These things are done later as well when dealing with parametrization
so this could be improved.
"""
parametrize_argnames: List[str] = []
for marker in node.iter_markers(name="parametrize"):
if not marker.kwargs.get("indirect", False):
p_argnames, _ = ParameterSet._parse_parametrize_args(
*marker.args, **marker.kwargs
)
parametrize_argnames.extend(p_argnames)
return parametrize_argnames
class FixtureManager: class FixtureManager:
"""pytest fixture definitions and information is stored and managed """pytest fixture definitions and information is stored and managed
from this class. from this class.
@ -1452,25 +1510,6 @@ class FixtureManager:
} }
session.config.pluginmanager.register(self, "funcmanage") session.config.pluginmanager.register(self, "funcmanage")
def _get_direct_parametrize_args(self, node: nodes.Node) -> List[str]:
"""Return all direct parametrization arguments of a node, so we don't
mistake them for fixtures.
Check https://github.com/pytest-dev/pytest/issues/5036.
These things are done later as well when dealing with parametrization
so this could be improved.
"""
parametrize_argnames: List[str] = []
for marker in node.iter_markers(name="parametrize"):
if not marker.kwargs.get("indirect", False):
p_argnames, _ = ParameterSet._parse_parametrize_args(
*marker.args, **marker.kwargs
)
parametrize_argnames.extend(p_argnames)
return parametrize_argnames
def getfixtureinfo( def getfixtureinfo(
self, node: nodes.Node, func, cls, funcargs: bool = True self, node: nodes.Node, func, cls, funcargs: bool = True
) -> FuncFixtureInfo: ) -> FuncFixtureInfo:
@ -1482,12 +1521,27 @@ class FixtureManager:
usefixtures = tuple( usefixtures = tuple(
arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args
) )
initialnames = usefixtures + argnames initialnames = tuple(
fm = node.session._fixturemanager dict.fromkeys(
initialnames, names_closure, arg2fixturedefs = fm.getfixtureclosure( tuple(self._getautousenames(node.nodeid)) + usefixtures + argnames
initialnames, node, ignore_args=self._get_direct_parametrize_args(node) )
)
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {}
names_closure, arg2num_fixturedefs_used = self.getfixtureclosure(
node,
initialnames,
arg2fixturedefs,
self,
ignore_args=_get_direct_parametrize_args(node),
)
return FuncFixtureInfo(
argnames,
initialnames,
names_closure,
arg2fixturedefs,
arg2num_fixturedefs_used,
) )
return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs)
def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None: def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None:
nodeid = None nodeid = None
@ -1518,12 +1572,14 @@ class FixtureManager:
if basenames: if basenames:
yield from basenames yield from basenames
@staticmethod
def getfixtureclosure( def getfixtureclosure(
self,
fixturenames: Tuple[str, ...],
parentnode: nodes.Node, parentnode: nodes.Node,
initialnames: Tuple[str],
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]],
fixturemanager: Optional["FixtureManager"] = None,
ignore_args: Sequence[str] = (), ignore_args: Sequence[str] = (),
) -> Tuple[Tuple[str, ...], List[str], Dict[str, Sequence[FixtureDef[Any]]]]: ) -> Tuple[List[str], Dict[str, List[FixtureDef[Any]]]]:
# Collect the closure of all fixtures, starting with the given # Collect the closure of all fixtures, starting with the given
# fixturenames as the initial set. As we have to visit all # fixturenames as the initial set. As we have to visit all
# factory definitions anyway, we also return an arg2fixturedefs # factory definitions anyway, we also return an arg2fixturedefs
@ -1532,44 +1588,74 @@ class FixtureManager:
# (discovering matching fixtures for a given name/node is expensive). # (discovering matching fixtures for a given name/node is expensive).
parentid = parentnode.nodeid parentid = parentnode.nodeid
fixturenames_closure = list(self._getautousenames(parentid)) fixturenames_closure: Dict[str, int] = {}
def merge(otherlist: Iterable[str]) -> None:
for arg in otherlist:
if arg not in fixturenames_closure:
fixturenames_closure.append(arg)
merge(fixturenames)
# At this point, fixturenames_closure contains what we call "initialnames", # At this point, fixturenames_closure contains what we call "initialnames",
# which is a set of fixturenames the function immediately requests. We # which is a set of fixturenames the function immediately requests. We
# need to return it as well, so save this. # need to return it as well, so save this.
initialnames = tuple(fixturenames_closure)
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {} arg2num_fixturedefs_used: Dict[str, int] = defaultdict(lambda: 0)
lastlen = -1 arg2num_def_used_in_path: Dict[str, int] = defaultdict(lambda: 0)
while lastlen != len(fixturenames_closure): nodes_in_fixture_tree: Deque[Tuple[str, bool]] = deque(
lastlen = len(fixturenames_closure) [(name, name != initialnames[-1]) for name in initialnames]
for argname in fixturenames_closure: )
if argname in ignore_args: nodes_in_path: Deque[Tuple[str, bool]] = deque()
continue
if argname in arg2fixturedefs:
continue
fixturedefs = self.getfixturedefs(argname, parentid)
if fixturedefs:
arg2fixturedefs[argname] = fixturedefs
merge(fixturedefs[-1].argnames)
def sort_by_scope(arg_name: str) -> Scope: while nodes_in_fixture_tree:
node, has_sibling = nodes_in_fixture_tree.popleft()
if node not in fixturenames_closure or fixturenames_closure[node][0] > len(
nodes_in_path
):
fixturenames_closure[node] = (
len(nodes_in_path),
len(fixturenames_closure),
)
if node not in arg2fixturedefs:
if node not in ignore_args:
fixturedefs = fixturemanager.getfixturedefs(node, parentid)
if fixturedefs:
arg2fixturedefs[node] = fixturedefs
if node in arg2fixturedefs:
def_index = arg2num_def_used_in_path[node] + 1
if (
def_index <= len(arg2fixturedefs[node])
and def_index > arg2num_fixturedefs_used[node]
):
arg2num_fixturedefs_used[node] = def_index
fixturedef = arg2fixturedefs[node][-def_index]
if fixturedef.argnames:
nodes_in_path.append((node, has_sibling))
arg2num_def_used_in_path[node] += 1
nodes_in_fixture_tree.extendleft(
[
(argname, argname != fixturedef.argnames[-1])
for argname in reversed(fixturedef.argnames)
]
)
continue
while not has_sibling:
try:
node, has_sibling = nodes_in_path.pop()
except IndexError:
assert len(nodes_in_fixture_tree) == 0
break
arg2num_def_used_in_path[node] -= 1
fixturenames_closure_list = list(fixturenames_closure)
def sort_by_scope_depth_and_arrival(arg_name: str) -> Scope:
depth, arrival = fixturenames_closure[arg_name]
try: try:
fixturedefs = arg2fixturedefs[arg_name] fixturedefs = arg2fixturedefs[arg_name]
except KeyError: except KeyError:
return Scope.Function return (Scope.Function, -depth, -arrival)
else: else:
return fixturedefs[-1]._scope return (fixturedefs[-1]._scope, -depth, -arrival)
fixturenames_closure.sort(key=sort_by_scope, reverse=True) fixturenames_closure_list.sort(
return initialnames, fixturenames_closure, arg2fixturedefs key=sort_by_scope_depth_and_arrival, reverse=True
)
return fixturenames_closure_list, arg2num_fixturedefs_used
def pytest_generate_tests(self, metafunc: "Metafunc") -> None: def pytest_generate_tests(self, metafunc: "Metafunc") -> None:
"""Generate new tests based on parametrized fixtures used by the given metafunc""" """Generate new tests based on parametrized fixtures used by the given metafunc"""
@ -1744,6 +1830,13 @@ class FixtureManager:
def _matchfactories( def _matchfactories(
self, fixturedefs: Iterable[FixtureDef[Any]], nodeid: str self, fixturedefs: Iterable[FixtureDef[Any]], nodeid: str
) -> Iterator[FixtureDef[Any]]: ) -> Iterator[FixtureDef[Any]]:
"""Yields the visible fixturedefs to a node with the given id
from among the specified fixturedefs.
:param Iterable[FixtureDef] fixturedefs: The list of specified fixturedefs.
:param str nodeid: Full node id of the node.
:rtype: Iterator[FixtureDef]
"""
parentnodeids = set(nodes.iterparentnodeids(nodeid)) parentnodeids = set(nodes.iterparentnodeids(nodeid))
for fixturedef in fixturedefs: for fixturedef in fixturedefs:
if fixturedef.baseid in parentnodeids: if fixturedef.baseid in parentnodeids:

View File

@ -58,7 +58,9 @@ from _pytest.config.argparsing import Parser
from _pytest.deprecated import check_ispytest from _pytest.deprecated import check_ispytest
from _pytest.deprecated import INSTANCE_COLLECTOR from _pytest.deprecated import INSTANCE_COLLECTOR
from _pytest.deprecated import NOSE_SUPPORT_METHOD from _pytest.deprecated import NOSE_SUPPORT_METHOD
from _pytest.fixtures import _get_direct_parametrize_args
from _pytest.fixtures import FixtureDef from _pytest.fixtures import FixtureDef
from _pytest.fixtures import FixtureManager
from _pytest.fixtures import FixtureRequest from _pytest.fixtures import FixtureRequest
from _pytest.fixtures import FuncFixtureInfo from _pytest.fixtures import FuncFixtureInfo
from _pytest.fixtures import get_scope_node from _pytest.fixtures import get_scope_node
@ -388,6 +390,26 @@ del _EmptyClass
# fmt: on # fmt: on
def unwrap_metafunc_parametrize_and_possibly_prune_dependency_tree(metafunc):
metafunc.parametrize = metafunc._parametrize
del metafunc._parametrize
if metafunc.has_dynamic_parametrize:
# Direct parametrization may have shadowed some fixtures
# so make sure we update what the function really needs.
fixture_closure, arg2num_fixturedefs_used = FixtureManager.getfixtureclosure(
metafunc.definition,
metafunc.definition._fixtureinfo.initialnames,
metafunc._arg2fixturedefs,
ignore_args=_get_direct_parametrize_args(metafunc.definition) + ["request"],
)
metafunc.fixturenames[:] = fixture_closure
metafunc.definition._fixtureinfo.name2num_fixturedefs_used.clear()
metafunc.definition._fixtureinfo.name2num_fixturedefs_used.update(
arg2num_fixturedefs_used
)
del metafunc.has_dynamic_parametrize
class PyCollector(PyobjMixin, nodes.Collector): class PyCollector(PyobjMixin, nodes.Collector):
def funcnamefilter(self, name: str) -> bool: def funcnamefilter(self, name: str) -> bool:
return self._matches_prefix_or_glob_option("python_functions", name) return self._matches_prefix_or_glob_option("python_functions", name)
@ -484,8 +506,6 @@ class PyCollector(PyobjMixin, nodes.Collector):
definition = FunctionDefinition.from_parent(self, name=name, callobj=funcobj) definition = FunctionDefinition.from_parent(self, name=name, callobj=funcobj)
fixtureinfo = definition._fixtureinfo fixtureinfo = definition._fixtureinfo
# pytest_generate_tests impls call metafunc.parametrize() which fills
# metafunc._calls, the outcome of the hook.
metafunc = Metafunc( metafunc = Metafunc(
definition=definition, definition=definition,
fixtureinfo=fixtureinfo, fixtureinfo=fixtureinfo,
@ -494,19 +514,29 @@ class PyCollector(PyobjMixin, nodes.Collector):
module=module, module=module,
_ispytest=True, _ispytest=True,
) )
methods = [] methods = [unwrap_metafunc_parametrize_and_possibly_prune_dependency_tree]
if hasattr(module, "pytest_generate_tests"): if hasattr(module, "pytest_generate_tests"):
methods.append(module.pytest_generate_tests) methods.append(module.pytest_generate_tests)
if cls is not None and hasattr(cls, "pytest_generate_tests"): if cls is not None and hasattr(cls, "pytest_generate_tests"):
methods.append(cls().pytest_generate_tests) methods.append(cls().pytest_generate_tests)
metafunc.has_dynamic_parametrize = False
from functools import wraps
@wraps(metafunc.parametrize)
def set_has_dynamic_parametrize(*args, **kwargs):
metafunc.has_dynamic_parametrize = True
metafunc._parametrize(*args, **kwargs)
metafunc._parametrize = metafunc.parametrize
metafunc.parametrize = set_has_dynamic_parametrize
# pytest_generate_tests impls call metafunc.parametrize() which fills
# metafunc._calls, the outcome of the hook.
self.ihook.pytest_generate_tests.call_extra(methods, dict(metafunc=metafunc)) self.ihook.pytest_generate_tests.call_extra(methods, dict(metafunc=metafunc))
if not metafunc._calls: if not metafunc._calls:
yield Function.from_parent(self, name=name, fixtureinfo=fixtureinfo) yield Function.from_parent(self, name=name, fixtureinfo=fixtureinfo)
else: else:
# Direct parametrization may have shadowed some fixtures
# so make sure we update what the function really needs.
fixtureinfo.prune_dependency_tree()
for callspec in metafunc._calls: for callspec in metafunc._calls:
subname = f"{name}[{callspec.id}]" subname = f"{name}[{callspec.id}]"
yield Function.from_parent( yield Function.from_parent(
@ -1360,7 +1390,7 @@ class Metafunc:
if arg_values_types[argname] == "params": if arg_values_types[argname] == "params":
continue continue
if name2pseudofixturedef is not None and argname in name2pseudofixturedef: if name2pseudofixturedef is not None and argname in name2pseudofixturedef:
arg2fixturedefs[argname] = [name2pseudofixturedef[argname]] fixturedef = name2pseudofixturedef[argname]
else: else:
fixturedef = FixtureDef( fixturedef = FixtureDef(
fixturemanager=self.definition.session._fixturemanager, fixturemanager=self.definition.session._fixturemanager,
@ -1373,9 +1403,10 @@ class Metafunc:
ids=None, ids=None,
is_pseudo=True, is_pseudo=True,
) )
arg2fixturedefs[argname] = [fixturedef]
if name2pseudofixturedef is not None: if name2pseudofixturedef is not None:
name2pseudofixturedef[argname] = fixturedef name2pseudofixturedef[argname] = fixturedef
arg2fixturedefs[argname] = [fixturedef]
self.definition._fixtureinfo.name2num_fixturedefs_used[argname] = 1
# Create the new calls: if we are parametrize() multiple times (by applying the decorator # Create the new calls: if we are parametrize() multiple times (by applying the decorator
# more than once) then we accumulate those calls generating the cartesian product # more than once) then we accumulate those calls generating the cartesian product
@ -1557,7 +1588,7 @@ def _find_parametrized_scope(
if all_arguments_are_fixtures: if all_arguments_are_fixtures:
fixturedefs = arg2fixturedefs or {} fixturedefs = arg2fixturedefs or {}
used_scopes = [ used_scopes = [
fixturedef[0]._scope fixturedef[0]._scope # Shouldn't be -1 ?
for name, fixturedef in fixturedefs.items() for name, fixturedef in fixturedefs.items()
if name in argnames if name in argnames
] ]

View File

@ -2,6 +2,7 @@ import os
import sys import sys
import textwrap import textwrap
from pathlib import Path from pathlib import Path
from unittest.mock import Mock
import pytest import pytest
from _pytest import fixtures from _pytest import fixtures
@ -2681,16 +2682,19 @@ class TestFixtureMarker:
""" """
) )
result = pytester.runpytest("-v") result = pytester.runpytest("-v")
# Order changed because fixture keys were sorted by their names in fixtures::get_fixture_keys
# beforehand so encap key came before flavor. This isn't problematic here as both fixtures
# are session-scoped but when this isn't the case, it might be problematic.
result.stdout.fnmatch_lines( result.stdout.fnmatch_lines(
""" """
test_dynamic_parametrized_ordering.py::test[flavor1-vxlan] PASSED test_dynamic_parametrized_ordering.py::test[flavor1-vxlan] PASSED
test_dynamic_parametrized_ordering.py::test2[flavor1-vxlan] PASSED test_dynamic_parametrized_ordering.py::test2[flavor1-vxlan] PASSED
test_dynamic_parametrized_ordering.py::test[flavor2-vxlan] PASSED
test_dynamic_parametrized_ordering.py::test2[flavor2-vxlan] PASSED
test_dynamic_parametrized_ordering.py::test[flavor2-vlan] PASSED
test_dynamic_parametrized_ordering.py::test2[flavor2-vlan] PASSED
test_dynamic_parametrized_ordering.py::test[flavor1-vlan] PASSED test_dynamic_parametrized_ordering.py::test[flavor1-vlan] PASSED
test_dynamic_parametrized_ordering.py::test2[flavor1-vlan] PASSED test_dynamic_parametrized_ordering.py::test2[flavor1-vlan] PASSED
test_dynamic_parametrized_ordering.py::test[flavor2-vlan] PASSED
test_dynamic_parametrized_ordering.py::test2[flavor2-vlan] PASSED
test_dynamic_parametrized_ordering.py::test[flavor2-vxlan] PASSED
test_dynamic_parametrized_ordering.py::test2[flavor2-vxlan] PASSED
""" """
) )
@ -4475,39 +4479,39 @@ def test_yield_fixture_with_no_value(pytester: Pytester) -> None:
assert result.ret == ExitCode.TESTS_FAILED assert result.ret == ExitCode.TESTS_FAILED
def test_teardown_high_scope_fixture_at_last_dependent_item_simple( def test_early_teardown_simple(
pytester: Pytester, pytester: Pytester,
) -> None: ) -> None:
pytester.makepyfile( pytester.makepyfile(
""" """
import pytest import pytest
@pytest.fixture(scope='module', params=[None]) @pytest.fixture(scope='module')
def fixture(): def fixture():
yield pass
print("Tearing down fixture!")
def test_0(fixture): def test_0(fixture):
pass pass
def test_1(fixture): def test_1(fixture):
print("Running test_1!") pass
def test_2(): def test_2():
print("Running test_2!") pass
""" """
) )
result = pytester.runpytest("-s") result = pytester.runpytest("--setup-show")
assert result.ret == 0
result.stdout.fnmatch_lines( result.stdout.fnmatch_lines(
[ [
"*Running test_1!*", "*SETUP M fixture*",
"*Tearing down fixture!*", "*test_0*",
"*Running test_2!*", "*test_1*",
"*TEARDOWN M fixture*",
"*test_2*",
] ]
) )
def test_teardown_high_scope_fixture_at_last_dependent_item_simple_2( def test_early_teardown_simple_2(
pytester: Pytester, pytester: Pytester,
) -> None: ) -> None:
pytester.makepyfile( pytester.makepyfile(
@ -4515,37 +4519,75 @@ def test_teardown_high_scope_fixture_at_last_dependent_item_simple_2(
import pytest import pytest
@pytest.fixture(scope='module', params=[None]) @pytest.fixture(scope='module', params=[None])
def fixture1(): def fixture1():
yield pass
print("Tearing down fixture!")
@pytest.fixture(scope='module', params=[None]) @pytest.fixture(scope='module', params=[None])
def fixture2(): def fixture2():
yield pass
print("Tearing down fixture!")
def test_0(fixture1): def test_0(fixture1):
pass pass
def test_1(fixture1, fixture2): def test_1(fixture1, fixture2):
print("Running test_1!") pass
def test_2(): def test_2():
print("Running test_2!") pass
""" """
) )
result = pytester.runpytest("-s") result = pytester.runpytest("--setup-show")
assert result.ret == 0 assert result.ret == 0
result.stdout.fnmatch_lines( result.stdout.fnmatch_lines(
[ [
"*Running test_1!*", "*SETUP M fixture1*",
"*Tearing down fixture!*", "*test_0*",
"*Tearing down fixture!*", "*SETUP M fixture2*",
"*Running test_2!*", "*test_1*",
"*TEARDOWN M fixture2*",
"*TEARDOWN M fixture1*",
"*test_2*",
] ]
) )
def test_teardown_high_scope_fixture_at_last_dependent_item_complex( def test_early_teardown_simple_3(pytester: Pytester):
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module')
def fixture1():
pass
@pytest.fixture(scope='module')
def fixture2():
pass
def test_0(fixture1, fixture2):
pass
def test_1(fixture1):
pass
def test_2(fixture1, fixture2):
pass
"""
)
result = pytester.runpytest("--setup-show")
result.stdout.fnmatch_lines(
[
"*SETUP M fixture1*",
"*SETUP M fixture2*",
"*test_0*",
"*test_2*",
"*TEARDOWN M fixture2*",
"*test_1*",
"*TEARDOWN M fixture1*",
],
consecutive=True,
)
def test_early_teardown_complex(
pytester: Pytester, pytester: Pytester,
) -> None: ) -> None:
pytester.makepyfile( pytester.makepyfile(
@ -4634,6 +4676,141 @@ def test_teardown_high_scope_fixture_at_last_dependent_item_complex(
) )
def test_fixtureclosure_contains_shadowed_fixtures(pytester: Pytester):
pytester.makeconftest(
"""
import pytest
@pytest.fixture
def fixt0():
pass
@pytest.fixture
def fixt1():
pass
@pytest.fixture
def fixt2(fixt0, fixt1):
pass
"""
)
pytester.makepyfile(
"""
import pytest
@pytest.fixture
def fixt2(fixt2):
pass
@pytest.fixture
def fixt3():
pass
def test(fixt2, fixt3):
pass
"""
)
result = pytester.runpytest("--setup-show")
result.stdout.fnmatch_lines(
[
"*SETUP F fixt0*",
"*SETUP F fixt1*",
"*SETUP F fixt2 (fixtures used: fixt0, fixt1)*",
"*SETUP F fixt2 (fixtures used: fixt2)*",
"*SETUP F fixt3*",
"*test (fixtures used: fixt0, fixt1, fixt2, fixt3)*",
]
)
def test_early_teardown_fixture_both_overriden_and_being_used(pytester: Pytester):
pytester.makeconftest(
"""
import pytest
@pytest.fixture(scope='session')
def shadowed():
pass
@pytest.fixture
def intermediary(shadowed):
pass
"""
)
pytester.makepyfile(
test_a="""
import pytest
def test_0(shadowed):
pass
""",
test_b="""
import pytest
@pytest.fixture(scope='session')
def shadowed():
pass
def test_1():
pass
def test_2(shadowed, intermediary):
pass
""",
)
result = pytester.runpytest("--setup-show")
result.stdout.fnmatch_lines(
[
"*SETUP S shadowed*",
"*test_0*",
"*TEARDOWN S shadowed*",
"*test_1*",
"*SETUP S shadowed*",
"*SETUP F intermediary*",
"*test_2*",
"*TEARDOWN F intermediary*",
"*TEARDOWN S shadowed*",
]
)
def test_early_teardown_homonym_session_scoped_fixtures(pytester: Pytester):
"""Session-scoped fixtures, as only argname and baseid are active in their
corresponding arg keys."""
pytester.makepyfile(
test_a="""
import pytest
@pytest.fixture(scope='session')
def fixture():
yield 0
def test_0(fixture):
assert fixture == 0
""",
test_b="""
import pytest
@pytest.fixture(scope='session')
def fixture():
yield 1
def test_1(fixture):
assert fixture == 1
""",
)
result = pytester.runpytest("--setup-show")
assert result.ret == 0
result.stdout.fnmatch_lines(
[
"*SETUP S fixture*",
"*test_0*",
"*TEARDOWN S fixture*",
"*SETUP S fixture",
"*test_1*",
"*TEARDOWN S fixture*",
]
)
def test_reorder_with_nonparametrized_fixtures(pytester: Pytester): def test_reorder_with_nonparametrized_fixtures(pytester: Pytester):
path = pytester.makepyfile( path = pytester.makepyfile(
""" """
@ -4696,6 +4873,326 @@ def test_reorder_with_both_parametrized_and_nonparametrized_fixtures(
result.stdout.fnmatch_lines([f"*test_{i}*" for i in [0, 2, 1]]) result.stdout.fnmatch_lines([f"*test_{i}*" for i in [0, 2, 1]])
def test_early_teardown_parametrized_homonym_parent_fixture(pytester: Pytester):
pytester.makeconftest(
"""
import pytest
@pytest.fixture(params=[0, 1], scope='session')
def fixture(request):
pass
"""
)
pytester.makepyfile(
test_a="""
import pytest
@pytest.fixture(scope='session')
def fixture(fixture):
pass
def test_0(fixture):
pass
def test_1():
pass
def test_2(fixture):
pass
""",
test_b="""
def test_3(fixture):
pass
def test_4():
pass
""",
test_c="""
import pytest
@pytest.fixture(scope='session')
def fixture():
pass
def test_5(fixture):
pass
""",
)
result = pytester.runpytest("--setup-show")
result.stdout.re_match_lines(
[
r"\s*SETUP S fixture\[0\].*",
r"\s*SETUP S fixture.*",
r".*test_0\[0\].*",
r".*test_2\[0\].*",
r"\s*TEARDOWN S fixture.*",
r".*test_3\[0\].*",
r"\s*TEARDOWN S fixture\[0\].*",
r"\s*SETUP S fixture\[1\].*",
r"\s*SETUP S fixture.*",
r".*test_0\[1\].*",
r".*test_2\[1\].*",
r"\s*TEARDOWN S fixture.*",
r".*test_3\[1\].*",
r"\s*TEARDOWN S fixture\[1\].*",
r".*test_1.*",
r".*test_4.*",
r"\s*SETUP S fixture.*",
r".*test_5.*",
r"\s*TEARDOWN S fixture.*",
]
)
def test_early_teardown_parametrized_homonym_parent_and_child_fixture(
pytester: Pytester,
):
pytester.makeconftest(
"""
import pytest
@pytest.fixture(params=[0, 1], scope='session')
def fixture(request):
pass
"""
)
pytester.makepyfile(
test_a="""
import pytest
@pytest.fixture(params=[2, 3], scope='session')
def fixture(request, fixture):
pass
def test_0(fixture):
pass
def test_1():
pass
def test_2(fixture):
pass
""",
test_b="""
def test_3(fixture):
pass
def test_4():
pass
def test_5(fixture):
pass
""",
)
result = pytester.runpytest("--setup-show")
result.stdout.re_match_lines(
[
r"\s*SETUP S fixture.*",
r"\s*SETUP S fixture \(fixtures used: fixture\)\[2\].*",
r".*test_0.*",
r".*test_2.*",
r"\s*TEARDOWN S fixture\[2\].*",
r"\s*TEARDOWN S fixture.*",
r"\s*SETUP S fixture.*",
r"\s*SETUP S fixture \(fixtures used: fixture\)\[3\].*",
r".*test_0.*",
r".*test_2.*",
r"\s*TEARDOWN S fixture\[3\].*",
r"\s*TEARDOWN S fixture.*",
r".*test_1.*",
r"\s*SETUP S fixture\[0\].*",
r".*test_3.*",
r".*test_5.*",
r"\s*TEARDOWN S fixture\[0\].*",
r"\s*SETUP S fixture\[1\].*",
r".*test_3.*",
r".*test_5.*",
r"\s*TEARDOWN S fixture\[1\].*",
r".*test_4.*",
]
)
def test_early_teardown_parametrized_overriden_and_overrider_fixture(
pytester: Pytester,
):
pytester.makeconftest(
"""
import pytest
@pytest.fixture(params=[0, 1], scope='session')
def fixture(request):
pass
"""
)
pytester.makepyfile(
test_a="""
def test_0(fixture):
pass
def test_1():
pass
def test_2(fixture):
pass
def test_3():
pass
""",
test_b="""
import pytest
@pytest.fixture(params=[2, 3], scope='session')
def fixture(request):
pass
def test_4(fixture):
pass
def test_5():
pass
def test_6(fixture):
pass
""",
)
result = pytester.runpytest("--setup-show")
result.stdout.re_match_lines(
[
r"\s*SETUP S fixture\[0\].*",
r".*test_0.*",
r".*test_2.*",
r"\s*TEARDOWN S fixture\[0\].*",
r"\s*SETUP S fixture\[1\].*",
r".*test_0.*",
r".*test_2.*",
r"\s*TEARDOWN S fixture\[1\].*",
r".*test_3.*",
r"\s*SETUP S fixture\[2\].*",
r".*test_4.*",
r".*test_6.*",
r"\s*TEARDOWN S fixture\[2\].*",
r"\s*SETUP S fixture\[3\].*",
r".*test_4.*",
r".*test_6.*",
r"\s*TEARDOWN S fixture\[3\].*",
r".*test_5.*",
]
)
def test_early_teardown_indirectly_parametrized_fixture(pytester: Pytester):
pytester.makeconftest(
"""
import pytest
@pytest.fixture(params=[0, 1], scope='session')
def fixture(request):
pass
"""
)
pytester.makepyfile(
test_a="""
import pytest
@pytest.fixture(scope='session')
def fixture(request, fixture):
pass
@pytest.mark.parametrize('fixture', [2, 3], indirect=True)
def test_0(fixture):
pass
def test_1():
pass
def test_2(fixture):
pass
def test_3():
pass
def test_4(fixture):
pass
""",
test_b="""
def test_5():
pass
def test_6(fixture):
pass
""",
)
result = pytester.runpytest("--setup-show")
result.stdout.re_match_lines(
[
r"\s*SETUP S fixture.*",
r"\s*SETUP S fixture \(fixtures used: fixture\)\[2\].*",
r".*test_0\[2\].*",
r"\s*TEARDOWN S fixture\[2\].*",
# One might expect conftest::fixture not to tear down here, as test_0[3] will use it afterwards,
# but since conftest::fixture gets parameter although it's not parametrized on test_0, it tears down
# itself as soon as it sees the parameter, which has nothing to do with, has changed from 2 to 3.
# In the future we could change callspec param dict to have the fixture baseid in its key as well to
# satisfy this expectation.
r"\s*TEARDOWN S fixture.*",
r"\s*SETUP S fixture.*",
r"\s*SETUP S fixture \(fixtures used: fixture\)\[3\].*",
r".*test_0\[3\].*",
r"\s*TEARDOWN S fixture\[3\].*",
r"\s*TEARDOWN S fixture.*",
r".*test_1.*",
r"\s*SETUP S fixture\[0\].*",
r"\s*SETUP S fixture.*",
r".*test_2\[0\].*",
r".*test_4\[0\].*",
r"\s*TEARDOWN S fixture.*",
r".*test_6\[0\].*",
r"\s*TEARDOWN S fixture\[0\].*",
r"\s*SETUP S fixture\[1\].*",
r"\s*SETUP S fixture.*",
r".*test_2\[1\].*",
r".*test_4\[1\].*",
r"\s*TEARDOWN S fixture.*",
r".*test_6\[1\].*",
r"\s*TEARDOWN S fixture\[1\].*",
r".*test_3.*",
r".*test_5.*",
]
)
def test_item_determines_which_def_to_use_not_the_requester(pytester: Pytester):
pytester.makeconftest(
"""
import pytest
@pytest.fixture
def fixture():
yield 1
@pytest.fixture
def intermediary(fixture):
yield fixture
"""
)
pytester.makepyfile(
"""
import pytest
@pytest.fixture
def fixture():
yield 2
def test_0(intermediary):
"If requester(intermediary here) was to determine which fixture to use, we should have had 1"
assert intermediary == 2
"""
)
result = pytester.runpytest()
result.ret == 0
def test_add_new_test_dependent_on_a_fixuture_and_use_nfplugin(pytester: Pytester): def test_add_new_test_dependent_on_a_fixuture_and_use_nfplugin(pytester: Pytester):
test_module_string = """ test_module_string = """
import pytest import pytest
@ -4765,9 +5262,6 @@ def test_last_dependent_test_on_a_fixture_is_in_last_failed_using_lfplugin(
) )
@pytest.mark.xfail(
reason="We do not attempt to tear down early the fixture that is overridden and also is used"
)
def test_early_teardown_of_overridden_and_being_used_fixture( def test_early_teardown_of_overridden_and_being_used_fixture(
pytester: Pytester, pytester: Pytester,
) -> None: ) -> None:
@ -4777,8 +5271,8 @@ def test_early_teardown_of_overridden_and_being_used_fixture(
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def fixture0(): def fixture0():
yield None # This fixture is used by its overrider
print("Tearing down higher-level fixture0") pass
""" """
) )
pytester.makepyfile( pytester.makepyfile(
@ -4787,23 +5281,26 @@ def test_early_teardown_of_overridden_and_being_used_fixture(
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def fixture0(fixture0): def fixture0(fixture0):
yield None pass
print("Tearing down lower-level fixture0")
def test_0(fixture0): def test_0(fixture0):
pass pass
def test_1(): def test_1():
print("Both `fixture0`s should have been torn down") pass
""" """
) )
result = pytester.runpytest("-s") result = pytester.runpytest("--setup-show")
result.stdout.fnmatch_lines( result.stdout.fnmatch_lines(
[ [
"*Tearing down lower-level fixture0*", "*SETUP M fixture0*",
"*Tearing down higher-level fixture0*", "*SETUP M fixture0*",
"*Both `fixture0`s should have been torn down*", "*test_0*",
] "*TEARDOWN M fixture0*",
"*TEARDOWN M fixture0*",
"*test_1*",
],
consecutive=True,
) )
@ -5010,3 +5507,125 @@ def test_early_teardown_does_not_occur_for_pseudo_fixtures(pytester: Pytester) -
import functools import functools
assert not any([isinstance(item.teardown, functools.partial) for item in items]) assert not any([isinstance(item.teardown, functools.partial) for item in items])
def test_fixture_info_after_dynamic_parametrize(pytester: Pytester) -> None:
pytester.makeconftest(
"""
import pytest
@pytest.fixture(scope='session', params=[0, 1])
def fixture1(request):
pass
@pytest.fixture(scope='session')
def fixture2(fixture1):
pass
@pytest.fixture(scope='session', params=[2, 3])
def fixture3(request, fixture2):
pass
"""
)
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
metafunc.parametrize("fixture2", [4, 5], scope='session')
@pytest.fixture(scope='session')
def fixture4():
pass
@pytest.fixture(scope='session')
def fixture2(fixture3, fixture4):
pass
def test(fixture2):
pass
"""
)
res = pytester.inline_run()
res.assertoutcome(passed=2)
def test_reordering_after_dynamic_parametrize(pytester: Pytester):
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
if metafunc.definition.name == "test_0":
metafunc.parametrize("fixture2", [0])
@pytest.fixture(scope='module')
def fixture1():
pass
@pytest.fixture(scope='module')
def fixture2(fixture1):
pass
def test_0(fixture2):
pass
def test_1():
pass
def test_2(fixture1):
pass
"""
)
result = pytester.runpytest("--collect-only")
result.stdout.fnmatch_lines(
[
"*test_0*",
"*test_1*",
"*test_2*",
],
consecutive=True,
)
def test_dont_recompute_dependency_tree_if_no_dynamic_parametrize(
pytester: Pytester, monkeypatch: MonkeyPatch
):
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
if metafunc.definition.name == "test_0":
metafunc.parametrize("fixture", [0])
@pytest.fixture(scope='module')
def fixture():
pass
def test_0(fixture):
pass
def test_1():
pass
@pytest.mark.parametrize("fixture", [0])
def test_2(fixture):
pass
@pytest.mark.parametrize("fixture", [0], indirect=True)
def test_3(fixture):
pass
"""
)
from _pytest.fixtures import FixtureManager
with monkeypatch.context() as m:
mocked_function = Mock(wraps=FixtureManager.getfixtureclosure)
m.setattr(FixtureManager, "getfixtureclosure", mocked_function)
pytester.inline_run()
assert len(mocked_function.call_args_list) == 5
assert mocked_function.call_args_list[0].args[0].nodeid.endswith("test_0")
assert mocked_function.call_args_list[1].args[0].nodeid.endswith("test_0")
assert mocked_function.call_args_list[2].args[0].nodeid.endswith("test_1")
assert mocked_function.call_args_list[3].args[0].nodeid.endswith("test_2")
assert mocked_function.call_args_list[4].args[0].nodeid.endswith("test_3")

View File

@ -35,6 +35,7 @@ class TestMetafunc:
# initialization. # initialization.
class FuncFixtureInfoMock: class FuncFixtureInfoMock:
name2fixturedefs = {} name2fixturedefs = {}
name2num_fixturedefs_used = {}
def __init__(self, names): def __init__(self, names):
self.names_closure = names self.names_closure = names
@ -55,6 +56,7 @@ class TestMetafunc:
names = getfuncargnames(func) names = getfuncargnames(func)
fixtureinfo: Any = FuncFixtureInfoMock(names) fixtureinfo: Any = FuncFixtureInfoMock(names)
definition: Any = DefinitionMock._create(obj=func, _nodeid="mock::nodeid") definition: Any = DefinitionMock._create(obj=func, _nodeid="mock::nodeid")
definition._fixtureinfo = fixtureinfo
definition.session = SessionMock(FixtureManagerMock({})) definition.session = SessionMock(FixtureManagerMock({}))
return python.Metafunc(definition, fixtureinfo, config, _ispytest=True) return python.Metafunc(definition, fixtureinfo, config, _ispytest=True)