This commit is contained in:
Sadra Barikbin 2024-01-03 03:11:06 -08:00 committed by GitHub
commit 14e765e843
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 311 additions and 73 deletions

View File

@ -17,6 +17,7 @@ from collections import defaultdict
from pathlib import Path
from pathlib import PurePath
from typing import Callable
from typing import DefaultDict
from typing import Dict
from typing import IO
from typing import Iterable
@ -668,9 +669,9 @@ class AssertionRewriter(ast.NodeVisitor):
else:
self.enable_assertion_pass_hook = False
self.source = source
self.scope: tuple[ast.AST, ...] = ()
self.variables_overwrite: defaultdict[
tuple[ast.AST, ...], Dict[str, str]
self.scope: Tuple[ast.AST, ...] = ()
self.variables_overwrite: DefaultDict[
Tuple[ast.AST, ...], Dict[str, str]
] = defaultdict(dict)
def run(self, mod: ast.Module) -> None:

View File

@ -314,33 +314,6 @@ class FuncFixtureInfo:
# sequence is ordered from furthest to closes to the function.
name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]]
def prune_dependency_tree(self) -> None:
"""Recompute names_closure from initialnames and name2fixturedefs.
Can only reduce names_closure, which means that the new closure will
always be a subset of the old one. The order is preserved.
This method is needed because direct parametrization may shadow some
of the fixtures that were included in the originally built dependency
tree. In this way the dependency tree can get pruned, and the closure
of argnames may get reduced.
"""
closure: Set[str] = set()
working_set = set(self.initialnames)
while working_set:
argname = working_set.pop()
# Argname may be smth not included in the original names_closure,
# in which case we ignore it. This currently happens with pseudo
# FixtureDefs which wrap 'get_direct_param_fixture_func(request)'.
# So they introduce the new dependency 'request' which might have
# been missing in the original tree (closure).
if argname not in closure and argname in self.names_closure:
closure.add(argname)
if argname in self.name2fixturedefs:
working_set.update(self.name2fixturedefs[argname][-1].argnames)
self.names_closure[:] = sorted(closure, key=self.names_closure.index)
class FixtureRequest(abc.ABC):
"""The type of the ``request`` fixture.
@ -961,7 +934,6 @@ def _eval_scope_callable(
return result
@final
class FixtureDef(Generic[FixtureValue]):
"""A container for a fixture definition.
@ -1104,6 +1076,26 @@ class FixtureDef(Generic[FixtureValue]):
)
class IdentityFixtureDef(FixtureDef[FixtureValue]):
def __init__(
self,
fixturemanager: "FixtureManager",
argname: str,
scope: Union[Scope, _ScopeName, Callable[[str, Config], _ScopeName], None],
*,
_ispytest: bool = False,
):
super().__init__(
fixturemanager,
"",
argname,
lambda request: request.param,
scope,
None,
_ispytest=_ispytest,
)
def resolve_fixture_function(
fixturedef: FixtureDef[FixtureValue], request: FixtureRequest
) -> "_FixtureFunc[FixtureValue]":
@ -1486,10 +1478,11 @@ class FixtureManager:
initialnames = deduplicate_names(autousenames, usefixturesnames, argnames)
direct_parametrize_args = _get_direct_parametrize_args(node)
names_closure, arg2fixturedefs = self.getfixtureclosure(
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {}
names_closure = self.getfixtureclosure(
parentnode=node,
initialnames=initialnames,
arg2fixturedefs=arg2fixturedefs,
ignore_args=direct_parametrize_args,
)
@ -1533,30 +1526,33 @@ class FixtureManager:
self,
parentnode: nodes.Node,
initialnames: Tuple[str, ...],
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]],
ignore_args: AbstractSet[str],
) -> Tuple[List[str], Dict[str, Sequence[FixtureDef[Any]]]]:
) -> List[str]:
# Collect the closure of all fixtures, starting with the given
# fixturenames as the initial set. As we have to visit all
# factory definitions anyway, we also return an arg2fixturedefs
# mapping so that the caller can reuse it and does not have
# to re-discover fixturedefs again for each fixturename
# initialnames containing function arguments, `usefixture` markers
# and `autouse` fixtures as the initial set. As we have to visit all
# factory definitions anyway, we also populate arg2fixturedefs mapping
# for the args missing therein so that the caller can reuse it and does
# not have to re-discover fixturedefs again for each fixturename
# (discovering matching fixtures for a given name/node is expensive).
parentid = parentnode.nodeid
fixturenames_closure = list(initialnames)
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {}
lastlen = -1
while lastlen != len(fixturenames_closure):
lastlen = len(fixturenames_closure)
for argname in fixturenames_closure:
if argname in ignore_args:
continue
if argname in arg2fixturedefs:
continue
fixturedefs = self.getfixturedefs(argname, parentid)
if fixturedefs:
arg2fixturedefs[argname] = fixturedefs
if argname not in arg2fixturedefs:
fixturedefs = self.getfixturedefs(argname, parentid)
if fixturedefs:
arg2fixturedefs[argname] = fixturedefs
else:
fixturedefs = arg2fixturedefs[argname]
if fixturedefs and not isinstance(fixturedefs[-1], IdentityFixtureDef):
for arg in fixturedefs[-1].argnames:
if arg not in fixturenames_closure:
fixturenames_closure.append(arg)
@ -1570,7 +1566,7 @@ class FixtureManager:
return fixturedefs[-1]._scope
fixturenames_closure.sort(key=sort_by_scope, reverse=True)
return fixturenames_closure, arg2fixturedefs
return fixturenames_closure
def pytest_generate_tests(self, metafunc: "Metafunc") -> None:
"""Generate new tests based on parametrized fixtures used by the given metafunc"""

View File

@ -59,10 +59,10 @@ from _pytest.config.argparsing import Parser
from _pytest.deprecated import check_ispytest
from _pytest.deprecated import INSTANCE_COLLECTOR
from _pytest.deprecated import NOSE_SUPPORT_METHOD
from _pytest.fixtures import FixtureDef
from _pytest.fixtures import FixtureRequest
from _pytest.fixtures import _get_direct_parametrize_args
from _pytest.fixtures import FuncFixtureInfo
from _pytest.fixtures import get_scope_node
from _pytest.fixtures import IdentityFixtureDef
from _pytest.main import Session
from _pytest.mark import MARK_GEN
from _pytest.mark import ParameterSet
@ -488,8 +488,6 @@ class PyCollector(PyobjMixin, nodes.Collector, abc.ABC):
)
fixtureinfo = definition._fixtureinfo
# pytest_generate_tests impls call metafunc.parametrize() which fills
# metafunc._calls, the outcome of the hook.
metafunc = Metafunc(
definition=definition,
fixtureinfo=fixtureinfo,
@ -498,23 +496,32 @@ class PyCollector(PyobjMixin, nodes.Collector, abc.ABC):
module=module,
_ispytest=True,
)
methods = []
def prune_dependency_tree_if_test_is_directly_parametrized(metafunc: Metafunc):
# Direct (those with `indirect=False`) parametrizations taking place in
# module/class-specific `pytest_generate_tests` hooks, a.k.a dynamic direct
# parametrizations using `metafunc.parametrize`, may have shadowed some
# fixtures, making some fixtures no longer reachable. Update the dependency
# tree to reflect what the item really needs.
# Note that direct parametrizations using `@pytest.mark.parametrize` have
# already been considered into making the closure using the `ignore_args`
# arg to `getfixtureclosure`.
if metafunc._has_direct_parametrization:
metafunc._update_dependency_tree()
methods = [prune_dependency_tree_if_test_is_directly_parametrized]
if hasattr(module, "pytest_generate_tests"):
methods.append(module.pytest_generate_tests)
if cls is not None and hasattr(cls, "pytest_generate_tests"):
methods.append(cls().pytest_generate_tests)
# pytest_generate_tests impls call metafunc.parametrize() which fills
# metafunc._calls, the outcome of the hook.
self.ihook.pytest_generate_tests.call_extra(methods, dict(metafunc=metafunc))
if not metafunc._calls:
yield Function.from_parent(self, name=name, fixtureinfo=fixtureinfo)
else:
# Direct parametrizations taking place in module/class-specific
# `metafunc.parametrize` calls may have shadowed some fixtures, so make sure
# we update what the function really needs a.k.a its fixture closure. Note that
# direct parametrizations using `@pytest.mark.parametrize` have already been considered
# into making the closure using `ignore_args` arg to `getfixtureclosure`.
fixtureinfo.prune_dependency_tree()
for callspec in metafunc._calls:
subname = f"{name}[{callspec.id}]"
yield Function.from_parent(
@ -1166,12 +1173,8 @@ class CallSpec2:
return "-".join(self._idlist)
def get_direct_param_fixture_func(request: FixtureRequest) -> Any:
return request.param
# Used for storing pseudo fixturedefs for direct parametrization.
name2pseudofixturedef_key = StashKey[Dict[str, FixtureDef[Any]]]()
name2pseudofixturedef_key = StashKey[Dict[str, IdentityFixtureDef[Any]]]()
@final
@ -1218,6 +1221,9 @@ class Metafunc:
# Result of parametrize().
self._calls: List[CallSpec2] = []
# Whether it's ever been directly parametrized, i.e. with `indirect=False`.
self._has_direct_parametrization = False
def parametrize(
self,
argnames: Union[str, Sequence[str]],
@ -1358,7 +1364,7 @@ class Metafunc:
if node is None:
name2pseudofixturedef = None
else:
default: Dict[str, FixtureDef[Any]] = {}
default: Dict[str, IdentityFixtureDef[Any]] = {}
name2pseudofixturedef = node.stash.setdefault(
name2pseudofixturedef_key, default
)
@ -1366,18 +1372,14 @@ class Metafunc:
for argname in argnames:
if arg_directness[argname] == "indirect":
continue
self._has_direct_parametrization = True
if name2pseudofixturedef is not None and argname in name2pseudofixturedef:
fixturedef = name2pseudofixturedef[argname]
else:
fixturedef = FixtureDef(
fixturemanager=self.definition.session._fixturemanager,
baseid="",
argname=argname,
func=get_direct_param_fixture_func,
scope=scope_,
params=None,
unittest=False,
ids=None,
fixturedef = IdentityFixtureDef(
self.definition.session._fixturemanager,
argname,
scope_,
_ispytest=True,
)
if name2pseudofixturedef is not None:
@ -1543,6 +1545,18 @@ class Metafunc:
pytrace=False,
)
def _update_dependency_tree(self) -> None:
definition = self.definition
assert definition.parent is not None
fm = definition.parent.session._fixturemanager
fixture_closure = fm.getfixtureclosure(
parentnode=definition,
initialnames=definition._fixtureinfo.initialnames,
arg2fixturedefs=definition._fixtureinfo.name2fixturedefs,
ignore_args=_get_direct_parametrize_args(definition),
)
definition._fixtureinfo.names_closure[:] = fixture_closure
def _find_parametrized_scope(
argnames: Sequence[str],

View File

@ -4534,8 +4534,198 @@ def test_yield_fixture_with_no_value(pytester: Pytester) -> None:
assert result.ret == ExitCode.TESTS_FAILED
def test_fixture_info_after_dynamic_parametrize(pytester: Pytester) -> None:
"""
Item dependency tree should get prunned before `FixtureManager::pytest_generate_tests`
hook implementation because it attempts to parametrize on the fixtures in the
fixture closure.
"""
pytester.makeconftest(
"""
import pytest
@pytest.fixture(scope='session', params=[0, 1])
def fixture1(request):
pass
@pytest.fixture(scope='session')
def fixture2(fixture1):
pass
@pytest.fixture(scope='session', params=[2, 3])
def fixture3(request, fixture2):
pass
"""
)
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
metafunc.parametrize("fixture2", [4, 5], scope='session')
@pytest.fixture(scope='session')
def fixture2(fixture3):
pass
def test(fixture2):
assert fixture2 in (4, 5)
"""
)
res = pytester.runpytest()
res.assert_outcomes(passed=2)
def test_reordering_after_dynamic_parametrize(pytester: Pytester):
"""Make sure that prunning dependency tree takes place correctly, regarding from
reordering's viewpoint."""
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
if metafunc.definition.name == "test_0":
metafunc.parametrize("fixture2", [0])
@pytest.fixture(scope='module', params=[0])
def fixture1():
pass
@pytest.fixture(scope='module')
def fixture2(fixture1):
pass
def test_0(fixture2):
pass
def test_1():
pass
def test_2(fixture1):
pass
"""
)
result = pytester.runpytest("--collect-only")
result.stdout.fnmatch_lines(
[
"*test_0*",
"*test_1*",
"*test_2*",
],
consecutive=True,
)
def test_request_shouldnt_be_in_closure_after_pruning_dep_tree_when_its_not_in_initial_closure(
pytester: Pytester,
):
"""Make sure that fixture `request` doesn't show up in the closure after prunning dependency
tree when it has not been there beforehand.
"""
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
metafunc.parametrize("arg", [0])
@pytest.fixture()
def fixture():
pass
def test(fixture, arg):
pass
"""
)
result = pytester.runpytest("--setup-show")
result.stdout.re_match_lines(
[
r".+test\[0\] \(fixtures used: arg, fixture\)\.",
],
)
def test_dont_recompute_dependency_tree_if_no_direct_dynamic_parametrize(
pytester: Pytester,
):
"""We should not update item's dependency tree when there's no direct dynamic
parametrization, i.e. `metafunc.parametrize(indirect=False)`s in module/class specific
`pytest_generate_tests` hooks, for the sake of efficiency.
"""
pytester.makeconftest(
"""
import pytest
from _pytest.config import hookimpl
from unittest.mock import Mock
original_method = None
@hookimpl(trylast=True)
def pytest_sessionstart(session):
global original_method
original_method = session._fixturemanager.getfixtureclosure
session._fixturemanager.getfixtureclosure = Mock(wraps=original_method)
@hookimpl(tryfirst=True)
def pytest_sessionfinish(session, exitstatus):
global original_method
session._fixturemanager.getfixtureclosure = original_method
"""
)
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
if metafunc.definition.name == "test_0":
metafunc.parametrize("fixture", [0])
if metafunc.definition.name == "test_4":
metafunc.parametrize("fixture", [0], indirect=True)
@pytest.fixture(scope='module')
def fixture():
pass
def test_0(fixture):
pass
def test_1():
pass
@pytest.mark.parametrize("fixture", [0])
def test_2(fixture):
pass
@pytest.mark.parametrize("fixture", [0], indirect=True)
def test_3(fixture):
pass
def test_4(fixture):
pass
@pytest.fixture
def fm(request):
yield request._fixturemanager
def test(fm):
calls = fm.getfixtureclosure.call_args_list
assert len(calls) == 7
assert calls[0].kwargs["parentnode"].nodeid.endswith("test_0")
assert calls[1].kwargs["parentnode"].nodeid.endswith("test_0")
assert calls[2].kwargs["parentnode"].nodeid.endswith("test_1")
assert calls[3].kwargs["parentnode"].nodeid.endswith("test_2")
assert calls[4].kwargs["parentnode"].nodeid.endswith("test_3")
assert calls[5].kwargs["parentnode"].nodeid.endswith("test_4")
assert calls[6].kwargs["parentnode"].nodeid.endswith("test")
"""
)
reprec = pytester.runpytest()
reprec.assert_outcomes(passed=6)
def test_deduplicate_names() -> None:
items = deduplicate_names("abacd")
assert items == ("a", "b", "c", "d")
items = deduplicate_names(items + ("g", "f", "g", "e", "b"))
items = deduplicate_names(items, ("g", "f", "g", "e", "b"))
assert items == ("a", "b", "c", "d", "g", "f", "e")

View File

@ -1,3 +1,4 @@
import pytest
from _pytest.pytester import Pytester
@ -252,3 +253,39 @@ def test_verbose_include_multiline_docstring(pytester: Pytester) -> None:
" Docstring content that extends into a third paragraph.",
]
)
@pytest.mark.xfail(
reason="python.py::show_fixtures_per_test uses arg2fixturedefs instead of fixtureclosure"
)
def test_should_not_show_fixtures_pruned_after_dynamic_parametrization(
pytester: Pytester,
) -> None:
pytester.makepyfile(
"""
import pytest
@pytest.fixture
def fixture1():
pass
@pytest.fixture
def fixture2(fixture1):
pass
@pytest.fixture
def fixture3(fixture2):
pass
def pytest_generate_tests(metafunc):
metafunc.parametrize("fixture3", [0])
def test(fixture3):
pass
"""
)
result = pytester.runpytest("--fixtures-per-test")
result.stdout.re_match_lines(
[r"-+ fixtures used by test\[0\] -+", r"-+ \(.+\) -+", r"fixture3 -- .+"],
consecutive=True,
)