Do the improvement

This commit is contained in:
Sadra Barikbin 2023-07-23 03:05:59 +03:30
parent 556e075d23
commit aa5d09deef
3 changed files with 214 additions and 60 deletions

View File

@ -312,33 +312,6 @@ class FuncFixtureInfo:
# sequence is ordered from furthest to closes to the function. # sequence is ordered from furthest to closes to the function.
name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]] name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]]
def prune_dependency_tree(self) -> None:
"""Recompute names_closure from initialnames and name2fixturedefs.
Can only reduce names_closure, which means that the new closure will
always be a subset of the old one. The order is preserved.
This method is needed because direct parametrization may shadow some
of the fixtures that were included in the originally built dependency
tree. In this way the dependency tree can get pruned, and the closure
of argnames may get reduced.
"""
closure: Set[str] = set()
working_set = set(self.initialnames)
while working_set:
argname = working_set.pop()
# Argname may be smth not included in the original names_closure,
# in which case we ignore it. This currently happens with pseudo
# FixtureDefs which wrap 'get_direct_param_fixture_func(request)'.
# So they introduce the new dependency 'request' which might have
# been missing in the original tree (closure).
if argname not in closure and argname in self.names_closure:
closure.add(argname)
if argname in self.name2fixturedefs:
working_set.update(self.name2fixturedefs[argname][-1].argnames)
self.names_closure[:] = sorted(closure, key=self.names_closure.index)
class FixtureRequest: class FixtureRequest:
"""A request for a fixture from a test or fixture function. """A request for a fixture from a test or fixture function.
@ -1431,11 +1404,28 @@ class FixtureManager:
usefixtures = tuple( usefixtures = tuple(
arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args
) )
initialnames = usefixtures + argnames initialnames = cast(
initialnames, names_closure, arg2fixturedefs = self.getfixtureclosure( Tuple[str],
initialnames, node, ignore_args=_get_direct_parametrize_args(node) tuple(
dict.fromkeys(
tuple(self._getautousenames(node.nodeid)) + usefixtures + argnames
)
),
)
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {}
names_closure = self.getfixtureclosure(
node,
initialnames,
arg2fixturedefs,
ignore_args=_get_direct_parametrize_args(node),
)
return FuncFixtureInfo(
argnames,
initialnames,
names_closure,
arg2fixturedefs,
) )
return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs)
def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None: def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None:
nodeid = None nodeid = None
@ -1468,45 +1458,38 @@ class FixtureManager:
def getfixtureclosure( def getfixtureclosure(
self, self,
fixturenames: Tuple[str, ...],
parentnode: nodes.Node, parentnode: nodes.Node,
initialnames: Tuple[str],
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]],
ignore_args: Sequence[str] = (), ignore_args: Sequence[str] = (),
) -> Tuple[Tuple[str, ...], List[str], Dict[str, Sequence[FixtureDef[Any]]]]: ) -> List[str]:
# Collect the closure of all fixtures, starting with the given # Collect the closure of all fixtures, starting with the given
# fixturenames as the initial set. As we have to visit all # initialnames as the initial set. As we have to visit all
# factory definitions anyway, we also return an arg2fixturedefs # factory definitions anyway, we also populate arg2fixturedefs
# mapping so that the caller can reuse it and does not have # mapping so that the caller can reuse it and does not have
# to re-discover fixturedefs again for each fixturename # to re-discover fixturedefs again for each fixturename
# (discovering matching fixtures for a given name/node is expensive). # (discovering matching fixtures for a given name/node is expensive).
parentid = parentnode.nodeid fixturenames_closure = list(initialnames)
fixturenames_closure = list(self._getautousenames(parentid))
def merge(otherlist: Iterable[str]) -> None: def merge(otherlist: Iterable[str]) -> None:
for arg in otherlist: for arg in otherlist:
if arg not in fixturenames_closure: if arg not in fixturenames_closure:
fixturenames_closure.append(arg) fixturenames_closure.append(arg)
merge(fixturenames)
# At this point, fixturenames_closure contains what we call "initialnames",
# which is a set of fixturenames the function immediately requests. We
# need to return it as well, so save this.
initialnames = tuple(fixturenames_closure)
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {}
lastlen = -1 lastlen = -1
parentid = parentnode.nodeid
while lastlen != len(fixturenames_closure): while lastlen != len(fixturenames_closure):
lastlen = len(fixturenames_closure) lastlen = len(fixturenames_closure)
for argname in fixturenames_closure: for argname in fixturenames_closure:
if argname in ignore_args: if argname in ignore_args:
continue continue
if argname in arg2fixturedefs: if argname not in arg2fixturedefs:
continue
fixturedefs = self.getfixturedefs(argname, parentid) fixturedefs = self.getfixturedefs(argname, parentid)
if fixturedefs: if fixturedefs:
arg2fixturedefs[argname] = fixturedefs arg2fixturedefs[argname] = fixturedefs
merge(fixturedefs[-1].argnames) if argname in arg2fixturedefs:
merge(arg2fixturedefs[argname][-1].argnames)
def sort_by_scope(arg_name: str) -> Scope: def sort_by_scope(arg_name: str) -> Scope:
try: try:
@ -1517,7 +1500,7 @@ class FixtureManager:
return fixturedefs[-1]._scope return fixturedefs[-1]._scope
fixturenames_closure.sort(key=sort_by_scope, reverse=True) fixturenames_closure.sort(key=sort_by_scope, reverse=True)
return initialnames, fixturenames_closure, arg2fixturedefs return fixturenames_closure
def pytest_generate_tests(self, metafunc: "Metafunc") -> None: def pytest_generate_tests(self, metafunc: "Metafunc") -> None:
"""Generate new tests based on parametrized fixtures used by the given metafunc""" """Generate new tests based on parametrized fixtures used by the given metafunc"""

View File

@ -11,6 +11,7 @@ import warnings
from collections import Counter from collections import Counter
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from functools import wraps
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from typing import Callable from typing import Callable
@ -60,6 +61,7 @@ from _pytest.deprecated import INSTANCE_COLLECTOR
from _pytest.deprecated import NOSE_SUPPORT_METHOD from _pytest.deprecated import NOSE_SUPPORT_METHOD
from _pytest.fixtures import FixtureDef from _pytest.fixtures import FixtureDef
from _pytest.fixtures import FixtureRequest from _pytest.fixtures import FixtureRequest
from _pytest.fixtures import _get_direct_parametrize_args
from _pytest.fixtures import FuncFixtureInfo from _pytest.fixtures import FuncFixtureInfo
from _pytest.fixtures import get_scope_node from _pytest.fixtures import get_scope_node
from _pytest.main import Session from _pytest.main import Session
@ -380,6 +382,23 @@ del _EmptyClass
# fmt: on # fmt: on
def unwrap_metafunc_parametrize_and_possibly_prune_dependency_tree(metafunc):
metafunc.parametrize = metafunc._parametrize
del metafunc._parametrize
if metafunc.has_dynamic_parametrize:
# Dynamic direct parametrization may have shadowed some fixtures
# so make sure we update what the function really needs.
definition = metafunc.definition
fixture_closure = definition.parent.session._fixturemanager.getfixtureclosure(
definition,
definition._fixtureinfo.initialnames,
definition._fixtureinfo.name2fixturedefs,
ignore_args=_get_direct_parametrize_args(definition) + ["request"],
)
definition._fixtureinfo.names_closure[:] = fixture_closure
del metafunc.has_dynamic_parametrize
class PyCollector(PyobjMixin, nodes.Collector): class PyCollector(PyobjMixin, nodes.Collector):
def funcnamefilter(self, name: str) -> bool: def funcnamefilter(self, name: str) -> bool:
return self._matches_prefix_or_glob_option("python_functions", name) return self._matches_prefix_or_glob_option("python_functions", name)
@ -476,8 +495,6 @@ class PyCollector(PyobjMixin, nodes.Collector):
definition = FunctionDefinition.from_parent(self, name=name, callobj=funcobj) definition = FunctionDefinition.from_parent(self, name=name, callobj=funcobj)
fixtureinfo = definition._fixtureinfo fixtureinfo = definition._fixtureinfo
# pytest_generate_tests impls call metafunc.parametrize() which fills
# metafunc._calls, the outcome of the hook.
metafunc = Metafunc( metafunc = Metafunc(
definition=definition, definition=definition,
fixtureinfo=fixtureinfo, fixtureinfo=fixtureinfo,
@ -486,22 +503,29 @@ class PyCollector(PyobjMixin, nodes.Collector):
module=module, module=module,
_ispytest=True, _ispytest=True,
) )
methods = [] methods = [unwrap_metafunc_parametrize_and_possibly_prune_dependency_tree]
if hasattr(module, "pytest_generate_tests"): if hasattr(module, "pytest_generate_tests"):
methods.append(module.pytest_generate_tests) methods.append(module.pytest_generate_tests)
if cls is not None and hasattr(cls, "pytest_generate_tests"): if cls is not None and hasattr(cls, "pytest_generate_tests"):
methods.append(cls().pytest_generate_tests) methods.append(cls().pytest_generate_tests)
setattr(metafunc, "has_dynamic_parametrize", False)
@wraps(metafunc.parametrize)
def set_has_dynamic_parametrize(*args, **kwargs):
setattr(metafunc, "has_dynamic_parametrize", True)
metafunc._parametrize(*args, **kwargs) # type: ignore[attr-defined]
setattr(metafunc, "_parametrize", metafunc.parametrize)
setattr(metafunc, "parametrize", set_has_dynamic_parametrize)
# pytest_generate_tests impls call metafunc.parametrize() which fills
# metafunc._calls, the outcome of the hook.
self.ihook.pytest_generate_tests.call_extra(methods, dict(metafunc=metafunc)) self.ihook.pytest_generate_tests.call_extra(methods, dict(metafunc=metafunc))
if not metafunc._calls: if not metafunc._calls:
yield Function.from_parent(self, name=name, fixtureinfo=fixtureinfo) yield Function.from_parent(self, name=name, fixtureinfo=fixtureinfo)
else: else:
# Direct parametrizations taking place in module/class-specific
# `metafunc.parametrize` calls may have shadowed some fixtures, so make sure
# we update what the function really needs a.k.a its fixture closure. Note that
# direct parametrizations using `@pytest.mark.parametrize` have already been considered
# into making the closure using `ignore_args` arg to `getfixtureclosure`.
fixtureinfo.prune_dependency_tree()
for callspec in metafunc._calls: for callspec in metafunc._calls:
subname = f"{name}[{callspec.id}]" subname = f"{name}[{callspec.id}]"

View File

@ -4534,3 +4534,150 @@ def test_yield_fixture_with_no_value(pytester: Pytester) -> None:
result.assert_outcomes(errors=1) result.assert_outcomes(errors=1)
result.stdout.fnmatch_lines([expected]) result.stdout.fnmatch_lines([expected])
assert result.ret == ExitCode.TESTS_FAILED assert result.ret == ExitCode.TESTS_FAILED
@pytest.mark.xfail(
reason="arg2fixturedefs should get updated on dynamic parametrize. This gets solved by PR#11220"
)
def test_fixture_info_after_dynamic_parametrize(pytester: Pytester) -> None:
pytester.makeconftest(
"""
import pytest
@pytest.fixture(scope='session', params=[0, 1])
def fixture1(request):
pass
@pytest.fixture(scope='session')
def fixture2(fixture1):
pass
@pytest.fixture(scope='session', params=[2, 3])
def fixture3(request, fixture2):
pass
"""
)
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
metafunc.parametrize("fixture2", [4, 5], scope='session')
@pytest.fixture(scope='session')
def fixture4():
pass
@pytest.fixture(scope='session')
def fixture2(fixture3, fixture4):
pass
def test(fixture2):
assert fixture2 in (4, 5)
"""
)
res = pytester.inline_run("-s")
res.assertoutcome(passed=2)
def test_reordering_after_dynamic_parametrize(pytester: Pytester):
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
if metafunc.definition.name == "test_0":
metafunc.parametrize("fixture2", [0])
@pytest.fixture(scope='module')
def fixture1():
pass
@pytest.fixture(scope='module')
def fixture2(fixture1):
pass
def test_0(fixture2):
pass
def test_1():
pass
def test_2(fixture1):
pass
"""
)
result = pytester.runpytest("--collect-only")
result.stdout.fnmatch_lines(
[
"*test_0*",
"*test_1*",
"*test_2*",
],
consecutive=True,
)
def test_dont_recompute_dependency_tree_if_no_dynamic_parametrize(pytester: Pytester):
pytester.makeconftest(
"""
import pytest
from _pytest.config import hookimpl
from unittest.mock import Mock
original_method = None
@hookimpl(trylast=True)
def pytest_sessionstart(session):
global original_method
original_method = session._fixturemanager.getfixtureclosure
session._fixturemanager.getfixtureclosure = Mock(wraps=original_method)
@hookimpl(tryfirst=True)
def pytest_sessionfinish(session, exitstatus):
global original_method
session._fixturemanager.getfixtureclosure = original_method
"""
)
pytester.makepyfile(
"""
import pytest
def pytest_generate_tests(metafunc):
if metafunc.definition.name == "test_0":
metafunc.parametrize("fixture", [0])
@pytest.fixture(scope='module')
def fixture():
pass
def test_0(fixture):
pass
def test_1():
pass
@pytest.mark.parametrize("fixture", [0])
def test_2(fixture):
pass
@pytest.mark.parametrize("fixture", [0], indirect=True)
def test_3(fixture):
pass
@pytest.fixture
def fm(request):
yield request._fixturemanager
def test(fm):
method = fm.getfixtureclosure
assert len(method.call_args_list) == 6
assert method.call_args_list[0].args[0].nodeid.endswith("test_0")
assert method.call_args_list[1].args[0].nodeid.endswith("test_0")
assert method.call_args_list[2].args[0].nodeid.endswith("test_1")
assert method.call_args_list[3].args[0].nodeid.endswith("test_2")
assert method.call_args_list[4].args[0].nodeid.endswith("test_3")
assert method.call_args_list[5].args[0].nodeid.endswith("test")
"""
)
reprec = pytester.inline_run()
reprec.assertoutcome(passed=5)