New solution

In pytest_collection_modifyitems using the global information being collected for reordering
This commit is contained in:
Sadra Barikbin 2023-03-26 22:24:52 +03:30
parent a3b39069bc
commit e6c01ca71a
7 changed files with 740 additions and 156 deletions

View File

@ -312,6 +312,7 @@ Ross Lawley
Ruaridh Williamson
Russel Winder
Ryan Wooden
Sadra Barikbin
Saiprasad Kale
Samuel Colvin
Samuel Dion-Girardeau

View File

@ -15,6 +15,7 @@ from typing import cast
from typing import Dict
from typing import Generator
from typing import Generic
from typing import Hashable
from typing import Iterable
from typing import Iterator
from typing import List
@ -146,80 +147,60 @@ def get_scope_node(
assert_never(scope)
def resolve_unique_values_and_their_indices_in_parametersets(
argnames: Sequence[str],
parametersets: Sequence[ParameterSet],
) -> Tuple[Dict[str, List[object]], List[Tuple[int]]]:
"""Resolve unique values and their indices in parameter sets. The index of a value
is determined by when it appears in the possible values for the first time.
For example, given ``argnames`` and ``parametersets`` below, the result would be:
::
argnames = ["A", "B", "C"]
parametersets = [("a1", "b1", "c1"), ("a1", "b2", "c1"), ("a1", "b3", "c2")]
result[0] = {"A": ["a1"], "B": ["b1", "b2", "b3"], "C": ["c1", "c2"]}
result[1] = [(0, 0, 0), (0, 1, 0), (0, 2, 1)]
result is used in reordering `indirect`ly parametrized with multiple
parameters or directly parametrized tests to keep items using the same fixture or
pseudo-fixture values respectively, close together.
:param argnames:
Argument names passed to ``parametrize()``.
:param parametersets:
The parameter sets, each containing a set of values corresponding
to ``argnames``.
:returns:
Tuple of unique parameter values and their indices in parametersets.
"""
indices = []
argname_value_indices_for_hashable_ones: Dict[str, Dict[object, int]] = defaultdict(dict)
argvalues_count: Dict[str, int] = defaultdict(lambda: 0)
unique_values: Dict[str, List[object]] = defaultdict(list)
for i, argname in enumerate(argnames):
argname_indices = []
for parameterset in parametersets:
value = parameterset.values[i]
try:
argname_indices.append(argname_value_indices_for_hashable_ones[argname][value])
except KeyError: # New unique value
argname_value_indices_for_hashable_ones[argname][value] = argvalues_count[argname]
argname_indices.append(argvalues_count[argname])
argvalues_count[argname] += 1
unique_values[argname].append(value)
except TypeError: # `value` is not hashable
argname_indices.append(argvalues_count[argname])
argvalues_count[argname] += 1
unique_values[argname].append(value)
indices.append(argname_indices)
return unique_values, list(zip(*indices))
# Used for storing artificial fixturedefs for direct parametrization.
name2pseudofixturedef_key = StashKey[Dict[str, "FixtureDef[Any]"]]()
def add_funcarg_pseudo_fixture_def(
collector: nodes.Collector, metafunc: "Metafunc", fixturemanager: "FixtureManager"
) -> None:
# This function will transform all collected calls to functions
# if they use direct funcargs (i.e. direct parametrization)
# because we want later test execution to be able to rely on
# an existing FixtureDef structure for all arguments.
# XXX we can probably avoid this algorithm if we modify CallSpec2
# to directly care for creating the fixturedefs within its methods.
if not metafunc._calls[0].funcargs:
# This function call does not have direct parametrization.
return
# Collect funcargs of all callspecs into a list of values.
arg2params: Dict[str, List[object]] = {}
arg2scope: Dict[str, Scope] = {}
for callspec in metafunc._calls:
for argname, argvalue in callspec.funcargs.items():
assert argname not in callspec.params
callspec.params[argname] = argvalue
arg2params_list = arg2params.setdefault(argname, [])
callspec.indices[argname] = len(arg2params_list)
arg2params_list.append(argvalue)
if argname not in arg2scope:
scope = callspec._arg2scope.get(argname, Scope.Function)
arg2scope[argname] = scope
callspec.funcargs.clear()
# Register artificial FixtureDef's so that later at test execution
# time we can rely on a proper FixtureDef to exist for fixture setup.
arg2fixturedefs = metafunc._arg2fixturedefs
for argname, valuelist in arg2params.items():
# If we have a scope that is higher than function, we need
# to make sure we only ever create an according fixturedef on
# a per-scope basis. We thus store and cache the fixturedef on the
# node related to the scope.
scope = arg2scope[argname]
node = None
if scope is not Scope.Function:
node = get_scope_node(collector, scope)
if node is None:
assert scope is Scope.Class and isinstance(
collector, _pytest.python.Module
)
# Use module-level collector for class-scope (for now).
node = collector
if node is None:
name2pseudofixturedef = None
else:
default: Dict[str, FixtureDef[Any]] = {}
name2pseudofixturedef = node.stash.setdefault(
name2pseudofixturedef_key, default
)
if name2pseudofixturedef is not None and argname in name2pseudofixturedef:
arg2fixturedefs[argname] = [name2pseudofixturedef[argname]]
else:
fixturedef = FixtureDef(
fixturemanager=fixturemanager,
baseid="",
argname=argname,
func=get_direct_param_fixture_func,
scope=arg2scope[argname],
params=valuelist,
unittest=False,
ids=None,
)
arg2fixturedefs[argname] = [fixturedef]
if name2pseudofixturedef is not None:
name2pseudofixturedef[argname] = fixturedef
def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
"""Return fixturemarker or None if it doesn't exist or raised
exceptions."""
@ -229,38 +210,58 @@ def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
)
# Parametrized fixture key, helper alias for code below.
_Key = Tuple[object, ...]
@dataclasses.dataclass(frozen=True)
class FixtureArgKey:
argname: str
param_index: Optional[int]
param_value: Optional[Hashable]
scoped_item_path: Optional[Path]
item_cls: Optional[type]
def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_Key]:
def get_fixture_arg_key(item: nodes.Item, argname: str, scope: Scope) -> FixtureArgKey:
param_index = None
param_value = None
if hasattr(item, 'callspec') and argname in item.callspec.params:
# Fixture is parametrized.
if isinstance(item.callspec.params[argname], Hashable):
param_value = item.callspec.params[argname]
else:
param_index = item.callspec.indices[argname]
if scope is Scope.Session:
scoped_item_path = None
elif scope is Scope.Package:
scoped_item_path = item.path.parent
elif scope in (Scope.Module, Scope.Class):
scoped_item_path = item.path
else:
assert_never(scope)
if scope is Scope.Class and type(item).__name__ != "DoctestItem":
item_cls = item.cls # type: ignore[attr-defined]
else:
item_cls = None
return FixtureArgKey(argname, param_index, param_value, scoped_item_path, item_cls)
def get_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[FixtureArgKey]:
"""Return list of keys for all parametrized arguments which match
the specified scope."""
assert scope is not Scope.Function
try:
callspec = item.callspec # type: ignore[attr-defined]
except AttributeError:
pass
else:
cs: CallSpec2 = callspec
# cs.indices.items() is random order of argnames. Need to
if hasattr(item, '_fixtureinfo'):
# sort this so that different calls to
# get_parametrized_fixture_keys will be deterministic.
for argname, param_index in sorted(cs.indices.items()):
if cs._arg2scope[argname] != scope:
# get_fixture_keys will be deterministic.
for argname, fixture_def in sorted(item._fixtureinfo.name2fixturedefs.items()):
# In the case item is parametrized on the `argname` with
# a scope, it overrides that of the fixture.
if hasattr(item, 'callspec') and argname in item.callspec._arg2scope:
if item.callspec._arg2scope[argname] != scope:
continue
elif fixture_def[-1]._scope != scope:
continue
if scope is Scope.Session:
key: _Key = (argname, param_index)
elif scope is Scope.Package:
key = (argname, param_index, item.path.parent)
elif scope is Scope.Module:
key = (argname, param_index, item.path)
elif scope is Scope.Class:
item_cls = item.cls # type: ignore[attr-defined]
key = (argname, param_index, item.path, item_cls)
else:
assert_never(scope)
yield key
yield get_fixture_arg_key(item, argname, scope)
# Algorithm for sorting on a per-parametrized resource setup basis.
@ -270,44 +271,66 @@ def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_K
def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]] = {}
items_by_argkey: Dict[Scope, Dict[_Key, Deque[nodes.Item]]] = {}
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]] = {}
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, Deque[nodes.Item]]] = {}
for scope in HIGH_SCOPES:
d: Dict[nodes.Item, Dict[_Key, None]] = {}
d: Dict[nodes.Item, Dict[FixtureArgKey, None]] = {}
argkeys_cache[scope] = d
item_d: Dict[_Key, Deque[nodes.Item]] = defaultdict(deque)
item_d: Dict[FixtureArgKey, Deque[nodes.Item]] = defaultdict(deque)
items_by_argkey[scope] = item_d
for item in items:
keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None)
keys = dict.fromkeys(get_fixture_keys(item, scope), None)
if keys:
d[item] = keys
for key in keys:
item_d[key].append(item)
items_dict = dict.fromkeys(items, None)
return list(
reordered_items = list(
reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session)
)
for scope in reversed(HIGH_SCOPES):
for key in items_by_argkey[scope]:
last_item_dependent_on_key = items_by_argkey[scope][key].pop()
fixturedef = last_item_dependent_on_key._fixtureinfo.name2fixturedefs[key.argname][-1]
if fixturedef.is_pseudo:
continue
last_item_dependent_on_key.teardown = functools.partial(
lambda other_finalizers, new_finalizer: [finalizer() for finalizer in (new_finalizer, other_finalizers)],
last_item_dependent_on_key.teardown,
functools.partial(fixturedef.finish, last_item_dependent_on_key._request)
)
return reordered_items
def fix_cache_order(
item: nodes.Item,
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]],
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]],
ignore: Set[Optional[FixtureArgKey]],
current_scope: Scope
) -> None:
for scope in HIGH_SCOPES:
if current_scope < scope:
continue
for key in argkeys_cache[scope].get(item, []):
if key in ignore:
continue
items_by_argkey[scope][key].appendleft(item)
# Make sure last dependent item on a key
# remains updated while reordering.
if items_by_argkey[scope][key][-1] == item:
items_by_argkey[scope][key].pop()
def reorder_items_atscope(
items: Dict[nodes.Item, None],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]],
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]],
scope: Scope,
) -> Dict[nodes.Item, None]:
if scope is Scope.Function or len(items) < 3:
return items
ignore: Set[Optional[_Key]] = set()
ignore: Set[Optional[FixtureArgKey]] = set()
items_deque = deque(items)
items_done: Dict[nodes.Item, None] = {}
scoped_items_by_argkey = items_by_argkey[scope]
@ -332,7 +355,7 @@ def reorder_items_atscope(
i for i in scoped_items_by_argkey[slicing_argkey] if i in items
]
for i in reversed(matching_items):
fix_cache_order(i, argkeys_cache, items_by_argkey)
fix_cache_order(i, argkeys_cache, items_by_argkey, ignore, scope)
items_deque.appendleft(i)
break
if no_argkey_group:
@ -345,10 +368,6 @@ def reorder_items_atscope(
return items_done
def get_direct_param_fixture_func(request: "FixtureRequest") -> Any:
return request.param
@dataclasses.dataclass
class FuncFixtureInfo:
__slots__ = ("argnames", "initialnames", "names_closure", "name2fixturedefs")
@ -891,7 +910,7 @@ def fail_fixturefunc(fixturefunc, msg: str) -> NoReturn:
def call_fixture_func(
fixturefunc: "_FixtureFunc[FixtureValue]", request: FixtureRequest, kwargs
fixturefunc: "_FixtureFunc[FixtureValue]", request: SubRequest, kwargs
) -> FixtureValue:
if is_generator(fixturefunc):
fixturefunc = cast(
@ -963,6 +982,7 @@ class FixtureDef(Generic[FixtureValue]):
ids: Optional[
Union[Tuple[Optional[object], ...], Callable[[Any], Optional[object]]]
] = None,
is_pseudo: bool = False,
) -> None:
self._fixturemanager = fixturemanager
# The "base" node ID for the fixture.
@ -1014,6 +1034,9 @@ class FixtureDef(Generic[FixtureValue]):
self.cached_result: Optional[_FixtureCachedResult[FixtureValue]] = None
self._finalizers: List[Callable[[], object]] = []
# Whether fixture is a pseudo-fixture made in direct parametrizations.
self.is_pseudo = is_pseudo
@property
def scope(self) -> "_ScopeName":
"""Scope string, one of "function", "class", "module", "package", "session"."""
@ -1572,6 +1595,9 @@ class FixtureManager:
# another fixture, while requesting the super fixture, keep going
# in case the super fixture is parametrized (#1953).
for fixturedef in reversed(fixture_defs):
# Skip pseudo-fixtures
if fixturedef.is_pseudo:
continue
# Fixture is parametrized, apply it and stop.
if fixturedef.params is not None:
metafunc.parametrize(

View File

@ -665,9 +665,10 @@ class Session(nodes.FSCollector):
self.items.extend(self.genitems(node))
self.config.pluginmanager.check_pending()
hook.pytest_collection_modifyitems(
session=self, config=self.config, items=items
)
if genitems:
hook.pytest_collection_modifyitems(
session=self, config=self.config, items=items
)
finally:
hook.pytest_collection_finish(session=self)

View File

@ -59,7 +59,12 @@ from _pytest.deprecated import check_ispytest
from _pytest.deprecated import FSCOLLECTOR_GETHOOKPROXY_ISINITPATH
from _pytest.deprecated import INSTANCE_COLLECTOR
from _pytest.deprecated import NOSE_SUPPORT_METHOD
from _pytest.fixtures import FuncFixtureInfo
from _pytest.fixtures import (FixtureDef,
FixtureRequest,
FuncFixtureInfo,
get_scope_node,
name2pseudofixturedef_key,
resolve_unique_values_and_their_indices_in_parametersets,)
from _pytest.main import Session
from _pytest.mark import MARK_GEN
from _pytest.mark import ParameterSet
@ -76,6 +81,7 @@ from _pytest.pathlib import ImportPathMismatchError
from _pytest.pathlib import parts
from _pytest.pathlib import visit
from _pytest.scope import Scope
from _pytest.stash import StashKey
from _pytest.warning_types import PytestCollectionWarning
from _pytest.warning_types import PytestReturnNotNoneWarning
from _pytest.warning_types import PytestUnhandledCoroutineWarning
@ -496,17 +502,12 @@ class PyCollector(PyobjMixin, nodes.Collector):
if cls is not None and hasattr(cls, "pytest_generate_tests"):
methods.append(cls().pytest_generate_tests)
self.ihook.pytest_generate_tests.call_extra(methods, dict(metafunc=metafunc))
if not metafunc._calls:
yield Function.from_parent(self, name=name, fixtureinfo=fixtureinfo)
else:
# Add funcargs() as fixturedefs to fixtureinfo.arg2fixturedefs.
fm = self.session._fixturemanager
fixtures.add_funcarg_pseudo_fixture_def(self, metafunc, fm)
# Add_funcarg_pseudo_fixture_def may have shadowed some fixtures
# with direct parametrization, so make sure we update what the
# function really needs.
# Direct parametrization may have shadowed some fixtures
# so make sure we update what the function really needs.
fixtureinfo.prune_dependency_tree()
for callspec in metafunc._calls:
@ -1146,32 +1147,23 @@ class CallSpec2:
def setmulti(
self,
*,
valtypes: Mapping[str, "Literal['params', 'funcargs']"],
argnames: Iterable[str],
valset: Iterable[object],
id: str,
marks: Iterable[Union[Mark, MarkDecorator]],
scope: Scope,
param_index: int,
param_indices: Tuple[int],
) -> "CallSpec2":
funcargs = self.funcargs.copy()
params = self.params.copy()
indices = self.indices.copy()
arg2scope = self._arg2scope.copy()
for arg, val in zip(argnames, valset):
if arg in params or arg in funcargs:
for arg, val, param_index in zip(argnames, valset, param_indices):
if arg in params:
raise ValueError(f"duplicate {arg!r}")
valtype_for_arg = valtypes[arg]
if valtype_for_arg == "params":
params[arg] = val
elif valtype_for_arg == "funcargs":
funcargs[arg] = val
else:
assert_never(valtype_for_arg)
params[arg] = val
indices[arg] = param_index
arg2scope[arg] = scope
return CallSpec2(
funcargs=funcargs,
params=params,
indices=indices,
_arg2scope=arg2scope,
@ -1190,6 +1182,10 @@ class CallSpec2:
return "-".join(self._idlist)
def get_direct_param_fixture_func(request: FixtureRequest) -> Any:
return request.param
@final
class Metafunc:
"""Objects passed to the :hook:`pytest_generate_tests` hook.
@ -1331,8 +1327,6 @@ class Metafunc:
self._validate_if_using_arg_names(argnames, indirect)
arg_values_types = self._resolve_arg_value_types(argnames, indirect)
# Use any already (possibly) generated ids with parametrize Marks.
if _param_mark and _param_mark._param_ids_from:
generated_ids = _param_mark._param_ids_from._param_ids_generated
@ -1342,27 +1336,67 @@ class Metafunc:
ids = self._resolve_parameter_set_ids(
argnames, ids, parametersets, nodeid=self.definition.nodeid
)
params_values, param_indices_list = resolve_unique_values_and_their_indices_in_parametersets(argnames, parametersets)
# Store used (possibly generated) ids with parametrize Marks.
if _param_mark and _param_mark._param_ids_from and generated_ids is None:
object.__setattr__(_param_mark._param_ids_from, "_param_ids_generated", ids)
# Add funcargs as fixturedefs to fixtureinfo.arg2fixturedefs by registering
# artificial FixtureDef's so that later at test execution time we can rely
# on a proper FixtureDef to exist for fixture setup.
arg2fixturedefs = self._arg2fixturedefs
node = None
if scope_ is not Scope.Function:
node = get_scope_node(self.definition.parent, scope_)
if node is None:
assert scope_ is Scope.Class and isinstance(
self.definition.parent, _pytest.python.Module
)
# Use module-level collector for class-scope (for now).
node = self.definition.parent
if node is None:
name2pseudofixturedef = None
else:
default: Dict[str, FixtureDef[Any]] = {}
name2pseudofixturedef = node.stash.setdefault(
name2pseudofixturedef_key, default
)
arg_values_types = self._resolve_arg_value_types(argnames, indirect)
for argname in argnames:
if arg_values_types[argname] == "params":
continue
if name2pseudofixturedef is not None and argname in name2pseudofixturedef:
arg2fixturedefs[argname] = [name2pseudofixturedef[argname]]
else:
fixturedef = FixtureDef(
fixturemanager=self.definition.session._fixturemanager,
baseid="",
argname=argname,
func=get_direct_param_fixture_func,
scope=scope_,
params=params_values[argname],
unittest=False,
ids=None,
is_pseudo=True
)
arg2fixturedefs[argname] = [fixturedef]
if name2pseudofixturedef is not None:
name2pseudofixturedef[argname] = fixturedef
# Create the new calls: if we are parametrize() multiple times (by applying the decorator
# more than once) then we accumulate those calls generating the cartesian product
# of all calls.
newcalls = []
for callspec in self._calls or [CallSpec2()]:
for param_index, (param_id, param_set) in enumerate(
zip(ids, parametersets)
):
for param_id, param_set, param_indices in zip(ids, parametersets, param_indices_list):
newcallspec = callspec.setmulti(
valtypes=arg_values_types,
argnames=argnames,
valset=param_set.values,
id=param_id,
marks=param_set.marks,
scope=scope_,
param_index=param_index,
param_indices=param_indices,
)
newcalls.append(newcallspec)
self._calls = newcalls

View File

@ -22,13 +22,13 @@ def checked_order():
assert order == [
("issue_519.py", "fix1", "arg1v1"),
("test_one[arg1v1-arg2v1]", "fix2", "arg2v1"),
("test_two[arg1v1-arg2v1]", "fix2", "arg2v1"),
("test_one[arg1v1-arg2v2]", "fix2", "arg2v2"),
("test_two[arg1v1-arg2v1]", "fix2", "arg2v1"),
("test_two[arg1v1-arg2v2]", "fix2", "arg2v2"),
("issue_519.py", "fix1", "arg1v2"),
("test_one[arg1v2-arg2v1]", "fix2", "arg2v1"),
("test_two[arg1v2-arg2v1]", "fix2", "arg2v1"),
("test_one[arg1v2-arg2v2]", "fix2", "arg2v2"),
("test_two[arg1v2-arg2v1]", "fix2", "arg2v1"),
("test_two[arg1v2-arg2v2]", "fix2", "arg2v2"),
]

View File

@ -12,6 +12,7 @@ from _pytest.monkeypatch import MonkeyPatch
from _pytest.pytester import get_public_names
from _pytest.pytester import Pytester
from _pytest.python import Function
from _pytest.scope import HIGH_SCOPES
def test_getfuncargnames_functions():
@ -4472,3 +4473,497 @@ def test_yield_fixture_with_no_value(pytester: Pytester) -> None:
result.assert_outcomes(errors=1)
result.stdout.fnmatch_lines([expected])
assert result.ret == ExitCode.TESTS_FAILED
def test_teardown_high_scope_fixture_at_last_dependent_item_simple(pytester: Pytester) -> None:
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module', params=[None])
def fixture():
yield
print("Tearing down fixture!")
def test_0(fixture):
pass
def test_1(fixture):
print("Running test_1!")
def test_2():
print("Running test_2!")
"""
)
result = pytester.runpytest("-s")
assert result.ret == 0
result.stdout.fnmatch_lines([
"*Running test_1!*",
"*Tearing down fixture!*",
"*Running test_2!*",
])
def test_teardown_high_scope_fixture_at_last_dependent_item_simple_2(pytester: Pytester) -> None:
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module', params=[None])
def fixture1():
yield
print("Tearing down fixture!")
@pytest.fixture(scope='module', params=[None])
def fixture2():
yield
print("Tearing down fixture!")
def test_0(fixture1):
pass
def test_1(fixture1, fixture2):
print("Running test_1!")
def test_2():
print("Running test_2!")
"""
)
result = pytester.runpytest("-s")
assert result.ret == 0
result.stdout.fnmatch_lines([
"*Running test_1!*",
"*Tearing down fixture!*",
"*Tearing down fixture!*",
"*Running test_2!*",
])
def test_teardown_high_scope_fixture_at_last_dependent_item_complex(pytester: Pytester) -> None:
pytester.makepyfile(
**{
"tests/conftest.py": "import pytest\n"
+ "\n".join(
[
textwrap.dedent(f"""
@pytest.fixture(scope='{scope.value}', params=[None])
def {scope.value}_scope_fixture(request):
yield None
print("Tearing down {scope.value}_scope_fixture")
""")
for scope in HIGH_SCOPES
]
),
"tests/test_module_a.py": """
class TestClass:
def test_class1(self, class_scope_fixture):
pass
def test_class2(self):
print("class_scope_fixture should have been torn down")
def test_class3(self, class_scope_fixture):
print("class_scope_fixture should'nt have been torn down")
def teardown_class(self):
print("Tearing down TestClass")
def test_module1(module_scope_fixture):
pass
def test_module2():
print("module_scope_fixture should have been torn down")
def teardown_module():
print("Tearing down test_module_a")
def test_package1(package_scope_fixture):
pass
""",
"tests/test_module_b.py": """
import pytest
def test_package2():
print("package_scope_fixture should have been torn down")
def test_session1(session_scope_fixture):
pass
def test_session2():
print("session_scope_fixture should have been torn down")
def test_session3(session_scope_fixture):
print("session_scope_fixture should'nt have been torn down")
""",
"tests/__init__.py": """
def teardown_module():
print("Tearing down package tests")
""",
}
)
result = pytester.runpytest("-s")
assert result.ret == 0
result.stdout.fnmatch_lines(
[
"*class_scope_fixture should'nt have been torn down*",
"*Tearing down class_scope_fixture*",
"*class_scope_fixture should have been torn down*",
"*Tearing down TestClass*",
"*Tearing down module_scope_fixture*",
"*module_scope_fixture should have been torn down*",
"*Tearing down test_module_a*",
"*Tearing down package_scope_fixture*",
"*package_scope_fixture should have been torn down*",
"*session_scope_fixture should'nt have been torn down*",
"*Tearing down session_scope_fixture*",
"*session_scope_fixture should have been torn down*",
"*Tearing down package tests*",
]
)
def test_reorder_with_nonparametrized_fixtures(pytester: Pytester):
path = pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module')
def a():
return "a"
@pytest.fixture(scope='module')
def b():
return "b"
def test_0(a):
pass
def test_1(b):
pass
def test_2(a):
pass
def test_3(b):
pass
def test_4(b):
pass
"""
)
result = pytester.runpytest(path, "-q", "--collect-only")
result.stdout.fnmatch_lines([f"*test_{i}*" for i in [0, 2, 1, 3, 4]])
def test_reorder_with_both_parametrized_and_nonparametrized_fixtures(pytester: Pytester):
path = pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module',params=[None])
def parametrized():
yield
@pytest.fixture(scope='module')
def nonparametrized():
yield
def test_0(parametrized, nonparametrized):
pass
def test_1():
pass
def test_2(nonparametrized):
pass
"""
)
result = pytester.runpytest(path, "-q", "--collect-only")
result.stdout.fnmatch_lines([f"*test_{i}*" for i in [0, 2, 1]])
def test_add_new_test_dependent_on_a_fixuture_and_use_nfplugin(pytester: Pytester):
test_module_string = """
import pytest
@pytest.fixture(scope='module')
def fixture():
yield
print("Tearing down fixture!")
def test_0(fixture):
pass
def test_1():
print("Running test_1!")
"""
path = pytester.makepyfile(test_module_string)
result = pytester.runpytest(path, "-s")
result.stdout.fnmatch_lines([
"*Tearing down fixture!*",
"*Running test_1!*"
])
test_module_string += """
def test_2(fixture):
pass
"""
path = pytester.makepyfile(test_module_string)
result = pytester.runpytest(path, "--new-first", "-s")
result.stdout.fnmatch_lines([
"*Tearing down fixture!*",
"*Running test_1!*",
"*Tearing down fixture!*",
])
def test_last_dependent_test_on_a_fixture_is_in_last_failed_using_lfplugin(pytester: Pytester):
test_module_string = """
import pytest
@pytest.fixture(scope='module')
def fixture():
yield
print("Tearing down fixture!")
def test_0(fixture):
print("Running test_0!")
assert {0}
def test_1(fixture):
print("Running test_1!")
assert True
def test_2():
print("Running test_2!")
assert {0}
"""
path = pytester.makepyfile(test_module_string.format("False"))
result = pytester.runpytest(path)
path = pytester.makepyfile(test_module_string.format("True"))
result = pytester.runpytest(path, "--last-failed", "-s")
result.stdout.fnmatch_lines([
"*Running test_0!*",
"*Running test_2!*",
"*Tearing down fixture!*",
])
@pytest.mark.xfail(reason="We do not attempt to tear down early the fixture that is overridden and also is used")
def test_early_teardown_of_overridden_and_being_used_fixture(pytester: Pytester) -> None:
pytester.makeconftest(
"""
import pytest
@pytest.fixture(scope='module')
def fixture0():
yield None
print("Tearing down higher-level fixture0")
"""
)
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module')
def fixture0(fixture0):
yield None
print("Tearing down lower-level fixture0")
def test_0(fixture0):
pass
def test_1():
print("Both `fixture0`s should have been torn down")
"""
)
result = pytester.runpytest("-s")
result.stdout.fnmatch_lines([
"*Tearing down lower-level fixture0*",
"*Tearing down higher-level fixture0*",
"*Both `fixture0`s should have been torn down*",
])
def test_basing_fixture_argkeys_on_param_values_rather_than_on_param_indices(pytester: Pytester):
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module')
def fixture1(request):
yield request.param
print(f"Tearing down fixture1 with param value `{request.param}`")
@pytest.mark.parametrize("fixture1",[1, 0],indirect=True)
def test_0(fixture1):
pass
@pytest.mark.parametrize("fixture1",[2, 1],indirect=True)
def test_1(fixture1):
pass
def test_2():
print("fixture1 should have been torn down 3 times")
@pytest.mark.parametrize("param", [0,1,2], scope='module')
def test_3(param):
pass
@pytest.mark.parametrize("param", [2,1,0], scope='module')
def test_4(param):
pass
""")
result = pytester.runpytest("--collect-only")
result.stdout.re_match_lines([
r" <Function test_0\[1\]>",
r" <Function test_1\[1\]>",
r" <Function test_0\[0\]>",
r" <Function test_1\[2\]>",
r" <Function test_2>",
r" <Function test_3\[0\]>",
r" <Function test_4\[0\]>",
r" <Function test_3\[1\]>",
r" <Function test_4\[1\]>",
r" <Function test_3\[2\]>",
r" <Function test_4\[2\]>",
])
result = pytester.runpytest("-s")
result.stdout.fnmatch_lines([
"*Tearing down fixture1 with param value `1`*",
"*Tearing down fixture1 with param value `0`*",
"*Tearing down fixture1 with param value `2`*",
"*fixture1 should have been torn down 3 times*",
])
def test_basing_fixture_argkeys_on_param_values_rather_than_on_param_indices_2(pytester: Pytester):
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module')
def fixture1(request):
yield request.param
print(f"Tearing down fixture1 with param value `{request.param}`")
@pytest.fixture(scope='module')
def fixture2(request):
yield request.param
print(f"Tearing down fixture2 with param value `{request.param}`")
@pytest.mark.parametrize("fixture1, fixture2", [("a", 0), ("b", 1), ("a", 2)], indirect=True)
def test_1(fixture1, fixture2):
pass
@pytest.mark.parametrize("fixture1, fixture2", [("c", 4), ("a", 3)], indirect=True)
def test_2(fixture1, fixture2):
pass
def test_3():
print("All fixtures should have been torn down")
@pytest.mark.parametrize("param1, param2", [("a", 0), ("b", 1), ("a", 2)], scope='module')
def test_4(param1, param2):
pass
@pytest.mark.parametrize("param1, param2", [("c", 4), ("a", 3)], scope='module')
def test_5(param1, param2):
pass
""")
result = pytester.runpytest("--collect-only")
result.stdout.re_match_lines([
r" <Function test_1\[a-0\]>",
r" <Function test_1\[a-2\]>",
r" <Function test_2\[a-3\]>",
r" <Function test_1\[b-1\]>",
r" <Function test_2\[c-4\]>",
r" <Function test_3>",
r" <Function test_4\[a-0\]>",
r" <Function test_4\[a-2\]>",
r" <Function test_5\[a-3\]>",
r" <Function test_4\[b-1\]>",
r" <Function test_5\[c-4\]>",
])
result = pytester.runpytest("-s")
result.stdout.fnmatch_lines([
"*Tearing down fixture2 with param value `0`*",
"*Tearing down fixture2 with param value `2`*",
"*Tearing down fixture2 with param value `3`*",
"*Tearing down fixture1 with param value `a`*",
"*Tearing down fixture2 with param value `1`*",
"*Tearing down fixture1 with param value `b`*",
"*Tearing down fixture2 with param value `4`*",
"*Tearing down fixture1 with param value `c`*",
"*All fixtures should have been torn down*",
])
def test_early_teardown_when_an_item_is_the_last_dependent_on_multiple_fixtures(pytester: Pytester):
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='module')
def fixture1():
yield None
print("Tearing down fixture1")
@pytest.fixture(scope='module')
def fixture2():
yield None
print(f"Tearing down fixture2")
@pytest.fixture(scope='module')
def fixture3():
yield None
print(f"Tearing down fixture3")
def test_0(fixture1):
print("No fixture should have been torn down")
def test_1(fixture1, fixture2):
print("No fixture should have been torn down")
def test_2(fixture1, fixture2, fixture3):
print("No fixture should have been torn down")
def test_3(fixture1, fixture2, fixture3):
print("No fixture should have been torn down")
def test_4():
print("All fixtures should have been torn down")
""")
result = pytester.runpytest("-s")
result.stdout.fnmatch_lines([
"*No fixture should have been torn down*",
"*No fixture should have been torn down*",
"*No fixture should have been torn down*",
"*No fixture should have been torn down*",
"*Tearing down fixture3*",
"*Tearing down fixture2*",
"*Tearing down fixture1*",
"*All fixtures should have been torn down*",
])
def test_early_teardown_does_not_occur_for_pseudo_fixtures(pytester: Pytester) -> None:
"""
Check that early teardown does not occur for pseudo fixtures which are created in
directly parametrized tests with high scopes.
"""
pytester.makepyfile(
"""
import pytest
@pytest.mark.parametrize("param", [0,1,2], scope='module')
def test_0(param):
pass
@pytest.mark.parametrize("param", [0,1,2], scope='module')
def test_1(param):
pass
"""
)
items = pytester.inline_run().getcalls("pytest_collection_finish")[0].session.items
import functools
assert not any([isinstance(item.teardown, functools.partial) for item in items])

View File

@ -34,11 +34,19 @@ class TestMetafunc:
# on the funcarg level, so we don't need a full blown
# initialization.
class FuncFixtureInfoMock:
name2fixturedefs = None
name2fixturedefs = {}
def __init__(self, names):
self.names_closure = names
@dataclasses.dataclass
class FixtureManagerMock:
config: Any
@dataclasses.dataclass
class SessionMock:
_fixturemanager: FixtureManagerMock
@dataclasses.dataclass
class DefinitionMock(python.FunctionDefinition):
_nodeid: str
@ -47,6 +55,8 @@ class TestMetafunc:
names = getfuncargnames(func)
fixtureinfo: Any = FuncFixtureInfoMock(names)
definition: Any = DefinitionMock._create(obj=func, _nodeid="mock::nodeid")
definition.session = SessionMock(FixtureManagerMock({}))
return python.Metafunc(definition, fixtureinfo, config, _ispytest=True)
def test_no_funcargs(self) -> None:
@ -99,7 +109,7 @@ class TestMetafunc:
# When the input is an iterator, only len(args) are taken,
# so the bad Exc isn't reached.
metafunc.parametrize("x", [1, 2], ids=gen()) # type: ignore[arg-type]
assert [(x.funcargs, x.id) for x in metafunc._calls] == [
assert [(x.params, x.id) for x in metafunc._calls] == [
({"x": 1}, "0"),
({"x": 2}, "2"),
]
@ -726,8 +736,10 @@ class TestMetafunc:
metafunc = self.Metafunc(func)
metafunc.parametrize("x, y", [("a", "b")], indirect=["x"])
assert metafunc._calls[0].funcargs == dict(y="b")
assert metafunc._calls[0].params == dict(x="a")
assert metafunc._calls[0].params == dict(x="a", y="b")
# Since `y` is a direct parameter, its pseudo-fixture would
# be registered.
assert list(metafunc._arg2fixturedefs.keys()) == ["y"]
def test_parametrize_indirect_list_all(self) -> None:
"""#714"""
@ -739,6 +751,7 @@ class TestMetafunc:
metafunc.parametrize("x, y", [("a", "b")], indirect=["x", "y"])
assert metafunc._calls[0].funcargs == {}
assert metafunc._calls[0].params == dict(x="a", y="b")
assert list(metafunc._arg2fixturedefs.keys()) == []
def test_parametrize_indirect_list_empty(self) -> None:
"""#714"""
@ -748,8 +761,9 @@ class TestMetafunc:
metafunc = self.Metafunc(func)
metafunc.parametrize("x, y", [("a", "b")], indirect=[])
assert metafunc._calls[0].funcargs == dict(x="a", y="b")
assert metafunc._calls[0].params == {}
assert metafunc._calls[0].params == dict(x="a", y="b")
assert metafunc._calls[0].funcargs == {}
assert list(metafunc._arg2fixturedefs.keys()) == ["x", "y"]
def test_parametrize_indirect_wrong_type(self) -> None:
def func(x, y):
@ -943,9 +957,11 @@ class TestMetafunc:
metafunc = self.Metafunc(lambda x: None)
metafunc.parametrize("x", [1, 2])
assert len(metafunc._calls) == 2
assert metafunc._calls[0].funcargs == dict(x=1)
assert metafunc._calls[0].params == dict(x=1)
assert metafunc._calls[0].funcargs == {}
assert metafunc._calls[0].id == "1"
assert metafunc._calls[1].funcargs == dict(x=2)
assert metafunc._calls[1].params == dict(x=2)
assert metafunc._calls[1].funcargs == {}
assert metafunc._calls[1].id == "2"
def test_parametrize_onearg_indirect(self) -> None:
@ -960,11 +976,22 @@ class TestMetafunc:
metafunc = self.Metafunc(lambda x, y: None)
metafunc.parametrize(("x", "y"), [(1, 2), (3, 4)])
assert len(metafunc._calls) == 2
assert metafunc._calls[0].funcargs == dict(x=1, y=2)
assert metafunc._calls[0].params == dict(x=1, y=2)
assert metafunc._calls[0].funcargs == {}
assert metafunc._calls[0].id == "1-2"
assert metafunc._calls[1].funcargs == dict(x=3, y=4)
assert metafunc._calls[1].params == dict(x=3, y=4)
assert metafunc._calls[1].funcargs == {}
assert metafunc._calls[1].id == "3-4"
def test_parametrize_with_duplicate_values(self) -> None:
metafunc = self.Metafunc(lambda x, y: None)
metafunc.parametrize(("x", "y"), [(1, 2), (3, 4), (1, 5), (2, 2)])
assert len(metafunc._calls) == 4
assert metafunc._calls[0].indices == dict(x=0, y=0)
assert metafunc._calls[1].indices == dict(x=1, y=1)
assert metafunc._calls[2].indices == dict(x=0, y=2)
assert metafunc._calls[3].indices == dict(x=2, y=0)
def test_parametrize_multiple_times(self, pytester: Pytester) -> None:
pytester.makepyfile(
"""