Merge pull request #11646 from bluetech/pkg-collect

Rework Session and Package collection
This commit is contained in:
Ran Benita
2023-12-30 12:51:48 +02:00
committed by GitHub
40 changed files with 998 additions and 346 deletions

View File

@@ -27,8 +27,8 @@ from _pytest.deprecated import check_ispytest
from _pytest.fixtures import fixture
from _pytest.fixtures import FixtureRequest
from _pytest.main import Session
from _pytest.nodes import Directory
from _pytest.nodes import File
from _pytest.python import Package
from _pytest.reports import TestReport
README_CONTENT = """\
@@ -222,7 +222,7 @@ class LFPluginCollWrapper:
self, collector: nodes.Collector
) -> Generator[None, CollectReport, CollectReport]:
res = yield
if isinstance(collector, (Session, Package)):
if isinstance(collector, (Session, Directory)):
# Sort any lf-paths to the beginning.
lf_paths = self.lfplugin._last_failed_paths

View File

@@ -415,8 +415,6 @@ class PytestPluginManager(PluginManager):
# session (#9478), often with the same path, so cache it.
self._get_directory = lru_cache(256)(_get_directory)
self._duplicatepaths: Set[Path] = set()
# plugins that were explicitly skipped with pytest.skip
# list of (module name, skip reason)
# previously we would issue a warning when a plugin was skipped, but

View File

@@ -284,11 +284,35 @@ def pytest_ignore_collect(
"""
@hookspec(firstresult=True)
def pytest_collect_directory(path: Path, parent: "Collector") -> "Optional[Collector]":
"""Create a :class:`~pytest.Collector` for the given directory, or None if
not relevant.
.. versionadded:: 8.0
For best results, the returned collector should be a subclass of
:class:`~pytest.Directory`, but this is not required.
The new node needs to have the specified ``parent`` as a parent.
Stops at first non-None result, see :ref:`firstresult`.
:param path: The path to analyze.
See :ref:`custom directory collectors` for a simple example of use of this
hook.
"""
def pytest_collect_file(
file_path: Path, path: "LEGACY_PATH", parent: "Collector"
) -> "Optional[Collector]":
"""Create a :class:`~pytest.Collector` for the given path, or None if not relevant.
For best results, the returned collector should be a subclass of
:class:`~pytest.File`, but this is not required.
The new node needs to have the specified ``parent`` as a parent.
:param file_path: The path to analyze.

View File

@@ -12,6 +12,7 @@ from typing import Callable
from typing import Dict
from typing import final
from typing import FrozenSet
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Literal
@@ -19,8 +20,6 @@ from typing import Optional
from typing import overload
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import Union
import pluggy
@@ -41,17 +40,13 @@ from _pytest.pathlib import absolutepath
from _pytest.pathlib import bestrelpath
from _pytest.pathlib import fnmatch_ex
from _pytest.pathlib import safe_exists
from _pytest.pathlib import visit
from _pytest.pathlib import scandir
from _pytest.reports import CollectReport
from _pytest.reports import TestReport
from _pytest.runner import collect_one_node
from _pytest.runner import SetupState
if TYPE_CHECKING:
from _pytest.python import Package
def pytest_addoption(parser: Parser) -> None:
parser.addini(
"norecursedirs",
@@ -414,6 +409,12 @@ def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[boo
return None
def pytest_collect_directory(
path: Path, parent: nodes.Collector
) -> Optional[nodes.Collector]:
return Dir.from_parent(parent, path=path)
def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None:
deselect_prefixes = tuple(config.getoption("deselect") or [])
if not deselect_prefixes:
@@ -470,7 +471,60 @@ class _bestrelpath_cache(Dict[Path, str]):
@final
class Session(nodes.FSCollector):
class Dir(nodes.Directory):
"""Collector of files in a file system directory.
.. versionadded:: 8.0
.. note::
Python directories with an `__init__.py` file are instead collected by
:class:`~pytest.Package` by default. Both are :class:`~pytest.Directory`
collectors.
"""
@classmethod
def from_parent( # type: ignore[override]
cls,
parent: nodes.Collector, # type: ignore[override]
*,
path: Path,
) -> "Dir":
"""The public constructor.
:param parent: The parent collector of this Dir.
:param path: The directory's path.
"""
return super().from_parent(parent=parent, path=path) # type: ignore[no-any-return]
def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]:
config = self.config
col: Optional[nodes.Collector]
cols: Sequence[nodes.Collector]
ihook = self.ihook
for direntry in scandir(self.path):
if direntry.is_dir():
if direntry.name == "__pycache__":
continue
path = Path(direntry.path)
if not self.session.isinitpath(path, with_parents=True):
if ihook.pytest_ignore_collect(collection_path=path, config=config):
continue
col = ihook.pytest_collect_directory(path=path, parent=self)
if col is not None:
yield col
elif direntry.is_file():
path = Path(direntry.path)
if not self.session.isinitpath(path):
if ihook.pytest_ignore_collect(collection_path=path, config=config):
continue
cols = ihook.pytest_collect_file(file_path=path, parent=self)
yield from cols
@final
class Session(nodes.Collector):
"""The root of the collection tree.
``Session`` collects the initial paths given as arguments to pytest.
@@ -486,6 +540,7 @@ class Session(nodes.FSCollector):
def __init__(self, config: Config) -> None:
super().__init__(
name="",
path=config.rootpath,
fspath=None,
parent=None,
@@ -499,6 +554,11 @@ class Session(nodes.FSCollector):
self.shouldfail: Union[bool, str] = False
self.trace = config.trace.root.get("collection")
self._initialpaths: FrozenSet[Path] = frozenset()
self._initialpaths_with_parents: FrozenSet[Path] = frozenset()
self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
self._initial_parts: List[Tuple[Path, List[str]]] = []
self._collection_cache: Dict[nodes.Collector, CollectReport] = {}
self.items: List[nodes.Item] = []
self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath)
@@ -549,10 +609,29 @@ class Session(nodes.FSCollector):
pytest_collectreport = pytest_runtest_logreport
def isinitpath(self, path: Union[str, "os.PathLike[str]"]) -> bool:
def isinitpath(
self,
path: Union[str, "os.PathLike[str]"],
*,
with_parents: bool = False,
) -> bool:
"""Is path an initial path?
An initial path is a path explicitly given to pytest on the command
line.
:param with_parents:
If set, also return True if the path is a parent of an initial path.
.. versionchanged:: 8.0
Added the ``with_parents`` parameter.
"""
# Optimization: Path(Path(...)) is much slower than isinstance.
path_ = path if isinstance(path, Path) else Path(path)
return path_ in self._initialpaths
if with_parents:
return path_ in self._initialpaths_with_parents
else:
return path_ in self._initialpaths
def gethookproxy(self, fspath: "os.PathLike[str]") -> pluggy.HookRelay:
# Optimization: Path(Path(...)) is much slower than isinstance.
@@ -560,15 +639,6 @@ class Session(nodes.FSCollector):
pm = self.config.pluginmanager
# Check if we have the common case of running
# hooks with all conftest.py files.
#
# TODO: pytest relies on this call to load non-initial conftests. This
# is incidental. It will be better to load conftests at a more
# well-defined place.
pm._loadconftestmodules(
path,
self.config.getoption("importmode"),
rootpath=self.config.rootpath,
)
my_conftestmodules = pm._getconftestmodules(path)
remove_mods = pm._conftest_plugins.difference(my_conftestmodules)
proxy: pluggy.HookRelay
@@ -580,49 +650,36 @@ class Session(nodes.FSCollector):
proxy = self.config.hook
return proxy
def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
if direntry.name == "__pycache__":
return False
fspath = Path(direntry.path)
ihook = self.gethookproxy(fspath.parent)
if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config):
return False
return True
def _collectpackage(self, fspath: Path) -> Optional["Package"]:
from _pytest.python import Package
ihook = self.gethookproxy(fspath)
if not self.isinitpath(fspath):
if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config):
return None
pkg: Package = Package.from_parent(self, path=fspath)
return pkg
def _collectfile(
self, fspath: Path, handle_dupes: bool = True
def _collect_path(
self,
path: Path,
path_cache: Dict[Path, Sequence[nodes.Collector]],
) -> Sequence[nodes.Collector]:
assert (
fspath.is_file()
), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
fspath, fspath.is_dir(), fspath.exists(), fspath.is_symlink()
)
ihook = self.gethookproxy(fspath)
if not self.isinitpath(fspath):
if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config):
return ()
"""Create a Collector for the given path.
if handle_dupes:
keepduplicates = self.config.getoption("keepduplicates")
if not keepduplicates:
duplicate_paths = self.config.pluginmanager._duplicatepaths
if fspath in duplicate_paths:
return ()
else:
duplicate_paths.add(fspath)
`path_cache` makes it so the same Collectors are returned for the same
path.
"""
if path in path_cache:
return path_cache[path]
return ihook.pytest_collect_file(file_path=fspath, parent=self) # type: ignore[no-any-return]
if path.is_dir():
ihook = self.gethookproxy(path.parent)
col: Optional[nodes.Collector] = ihook.pytest_collect_directory(
path=path, parent=self
)
cols: Sequence[nodes.Collector] = (col,) if col is not None else ()
elif path.is_file():
ihook = self.gethookproxy(path)
cols = ihook.pytest_collect_file(file_path=path, parent=self)
else:
# Broken symlink or invalid/missing file.
cols = ()
path_cache[path] = cols
return cols
@overload
def perform_collect(
@@ -658,15 +715,16 @@ class Session(nodes.FSCollector):
self.trace("perform_collect", self, args)
self.trace.root.indent += 1
self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
self._initial_parts: List[Tuple[Path, List[str]]] = []
self.items: List[nodes.Item] = []
hook = self.config.hook
self._notfound = []
self._initial_parts = []
self._collection_cache = {}
self.items = []
items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items
try:
initialpaths: List[Path] = []
initialpaths_with_parents: List[Path] = []
for arg in args:
fspath, parts = resolve_collection_argument(
self.config.invocation_params.dir,
@@ -675,7 +733,11 @@ class Session(nodes.FSCollector):
)
self._initial_parts.append((fspath, parts))
initialpaths.append(fspath)
initialpaths_with_parents.append(fspath)
initialpaths_with_parents.extend(fspath.parents)
self._initialpaths = frozenset(initialpaths)
self._initialpaths_with_parents = frozenset(initialpaths_with_parents)
rep = collect_one_node(self)
self.ihook.pytest_collectreport(report=rep)
self.trace.root.indent -= 1
@@ -684,12 +746,13 @@ class Session(nodes.FSCollector):
for arg, collectors in self._notfound:
if collectors:
errors.append(
f"not found: {arg}\n(no name {arg!r} in any of {collectors!r})"
f"not found: {arg}\n(no match in any of {collectors!r})"
)
else:
errors.append(f"found no collectors for {arg}")
raise UsageError(*errors)
if not genitems:
items = rep.result
else:
@@ -702,22 +765,34 @@ class Session(nodes.FSCollector):
session=self, config=self.config, items=items
)
finally:
self._notfound = []
self._initial_parts = []
self._collection_cache = {}
hook.pytest_collection_finish(session=self)
self.testscollected = len(items)
if genitems:
self.testscollected = len(items)
return items
def _collect_one_node(
self,
node: nodes.Collector,
handle_dupes: bool = True,
) -> Tuple[CollectReport, bool]:
if node in self._collection_cache and handle_dupes:
rep = self._collection_cache[node]
return rep, True
else:
rep = collect_one_node(node)
self._collection_cache[node] = rep
return rep, False
def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
# Keep track of any collected nodes in here, so we don't duplicate fixtures.
node_cache1: Dict[Path, Sequence[nodes.Collector]] = {}
node_cache2: Dict[Tuple[Type[nodes.Collector], Path], nodes.Collector] = {}
# Keep track of any collected collectors in matchnodes paths, so they
# are not collected more than once.
matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = {}
# Directories of pkgs with dunder-init files.
pkg_roots: Dict[Path, "Package"] = {}
# This is a cache for the root directories of the initial paths.
# We can't use collection_cache for Session because of its special
# role as the bootstrapping collector.
path_cache: Dict[Path, Sequence[nodes.Collector]] = {}
pm = self.config.pluginmanager
@@ -725,108 +800,87 @@ class Session(nodes.FSCollector):
self.trace("processing argument", (argpath, names))
self.trace.root.indent += 1
# Start with a Session root, and delve to argpath item (dir or file)
# and stack all Packages found on the way.
for parent in (argpath, *argpath.parents):
if not pm._is_in_confcutdir(argpath):
break
if parent.is_dir():
pkginit = parent / "__init__.py"
if pkginit.is_file() and parent not in node_cache1:
pkg = self._collectpackage(parent)
if pkg is not None:
pkg_roots[parent] = pkg
node_cache1[pkg.path] = [pkg]
# If it's a directory argument, recurse and look for any Subpackages.
# Let the Package collector deal with subnodes, don't collect here.
# resolve_collection_argument() ensures this.
if argpath.is_dir():
assert not names, f"invalid arg {(argpath, names)!r}"
if argpath in pkg_roots:
yield pkg_roots[argpath]
# Match the argpath from the root, e.g.
# /a/b/c.py -> [/, /a, /a/b, /a/b/c.py]
paths = [*reversed(argpath.parents), argpath]
# Paths outside of the confcutdir should not be considered, unless
# it's the argpath itself.
while len(paths) > 1 and not pm._is_in_confcutdir(paths[0]):
paths = paths[1:]
for direntry in visit(argpath, self._recurse):
path = Path(direntry.path)
if direntry.is_dir() and self._recurse(direntry):
pkginit = path / "__init__.py"
if pkginit.is_file():
pkg = self._collectpackage(path)
if pkg is not None:
yield pkg
pkg_roots[path] = pkg
# Start going over the parts from the root, collecting each level
# and discarding all nodes which don't match the level's part.
any_matched_in_initial_part = False
notfound_collectors = []
work: List[
Tuple[Union[nodes.Collector, nodes.Item], List[Union[Path, str]]]
] = [(self, paths + names)]
while work:
matchnode, matchparts = work.pop()
elif direntry.is_file():
if path.parent in pkg_roots:
# Package handles this file.
continue
for x in self._collectfile(path):
key2 = (type(x), x.path)
if key2 in node_cache2:
yield node_cache2[key2]
else:
node_cache2[key2] = x
yield x
else:
assert argpath.is_file()
if argpath in node_cache1:
col = node_cache1[argpath]
else:
collect_root = pkg_roots.get(argpath.parent, self)
col = collect_root._collectfile(argpath, handle_dupes=False)
if col:
node_cache1[argpath] = col
matching = []
work: List[
Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]
] = [(col, names)]
while work:
self.trace("matchnodes", col, names)
self.trace.root.indent += 1
matchnodes, matchnames = work.pop()
for node in matchnodes:
if not matchnames:
matching.append(node)
continue
if not isinstance(node, nodes.Collector):
continue
key = (type(node), node.nodeid)
if key in matchnodes_cache:
rep = matchnodes_cache[key]
else:
rep = collect_one_node(node)
matchnodes_cache[key] = rep
if rep.passed:
submatchnodes = []
for r in rep.result:
# TODO: Remove parametrized workaround once collection structure contains
# parametrization.
if (
r.name == matchnames[0]
or r.name.split("[")[0] == matchnames[0]
):
submatchnodes.append(r)
if submatchnodes:
work.append((submatchnodes, matchnames[1:]))
else:
# Report collection failures here to avoid failing to run some test
# specified in the command line because the module could not be
# imported (#134).
node.ihook.pytest_collectreport(report=rep)
self.trace("matchnodes finished -> ", len(matching), "nodes")
self.trace.root.indent -= 1
if not matching:
report_arg = "::".join((str(argpath), *names))
self._notfound.append((report_arg, col))
# Pop'd all of the parts, this is a match.
if not matchparts:
yield matchnode
any_matched_in_initial_part = True
continue
yield from matching
# Should have been matched by now, discard.
if not isinstance(matchnode, nodes.Collector):
continue
# Collect this level of matching.
# Collecting Session (self) is done directly to avoid endless
# recursion to this function.
subnodes: Sequence[Union[nodes.Collector, nodes.Item]]
if isinstance(matchnode, Session):
assert isinstance(matchparts[0], Path)
subnodes = matchnode._collect_path(matchparts[0], path_cache)
else:
# For backward compat, files given directly multiple
# times on the command line should not be deduplicated.
handle_dupes = not (
len(matchparts) == 1
and isinstance(matchparts[0], Path)
and matchparts[0].is_file()
)
rep, duplicate = self._collect_one_node(matchnode, handle_dupes)
if not duplicate and not rep.passed:
# Report collection failures here to avoid failing to
# run some test specified in the command line because
# the module could not be imported (#134).
matchnode.ihook.pytest_collectreport(report=rep)
if not rep.passed:
continue
subnodes = rep.result
# Prune this level.
any_matched_in_collector = False
for node in subnodes:
# Path part e.g. `/a/b/` in `/a/b/test_file.py::TestIt::test_it`.
if isinstance(matchparts[0], Path):
is_match = node.path == matchparts[0]
# Name part e.g. `TestIt` in `/a/b/test_file.py::TestIt::test_it`.
else:
# TODO: Remove parametrized workaround once collection structure contains
# parametrization.
is_match = (
node.name == matchparts[0]
or node.name.split("[")[0] == matchparts[0]
)
if is_match:
work.append((node, matchparts[1:]))
any_matched_in_collector = True
if not any_matched_in_collector:
notfound_collectors.append(matchnode)
if not any_matched_in_initial_part:
report_arg = "::".join((str(argpath), *names))
self._notfound.append((report_arg, notfound_collectors))
self.trace.root.indent -= 1
@@ -839,11 +893,17 @@ class Session(nodes.FSCollector):
yield node
else:
assert isinstance(node, nodes.Collector)
rep = collect_one_node(node)
keepduplicates = self.config.getoption("keepduplicates")
# For backward compat, dedup only applies to files.
handle_dupes = not (keepduplicates and isinstance(node, nodes.File))
rep, duplicate = self._collect_one_node(node, handle_dupes)
if duplicate and not keepduplicates:
return
if rep.passed:
for subnode in rep.result:
yield from self.genitems(subnode)
node.ihook.pytest_collectreport(report=rep)
if not duplicate:
node.ihook.pytest_collectreport(report=rep)
def search_pypath(module_name: str) -> str:

View File

@@ -152,12 +152,19 @@ class KeywordMatcher:
def from_item(cls, item: "Item") -> "KeywordMatcher":
mapped_names = set()
# Add the names of the current item and any parent items.
# Add the names of the current item and any parent items,
# except the Session and root Directory's which are not
# interesting for matching.
import pytest
for node in item.listchain():
if not isinstance(node, pytest.Session):
mapped_names.add(node.name)
if isinstance(node, pytest.Session):
continue
if isinstance(node, pytest.Directory) and isinstance(
node.parent, pytest.Session
):
continue
mapped_names.add(node.name)
# Add the names added as extra keywords to current or parent items.
mapped_names.update(item.listextrakeywords())

View File

@@ -676,6 +676,24 @@ class File(FSCollector, abc.ABC):
"""
class Directory(FSCollector, abc.ABC):
"""Base class for collecting files from a directory.
A basic directory collector does the following: goes over the files and
sub-directories in the directory and creates collectors for them by calling
the hooks :hook:`pytest_collect_directory` and :hook:`pytest_collect_file`,
after checking that they are not ignored using
:hook:`pytest_ignore_collect`.
The default directory collectors are :class:`~pytest.Dir` and
:class:`~pytest.Package`.
.. versionadded:: 8.0
:ref:`custom directory collectors`.
"""
class Item(Node, abc.ABC):
"""Base class of all test invocation items.

View File

@@ -689,10 +689,14 @@ def resolve_package_path(path: Path) -> Optional[Path]:
return result
def scandir(path: Union[str, "os.PathLike[str]"]) -> List["os.DirEntry[str]"]:
def scandir(
path: Union[str, "os.PathLike[str]"],
sort_key: Callable[["os.DirEntry[str]"], object] = lambda entry: entry.name,
) -> List["os.DirEntry[str]"]:
"""Scan a directory recursively, in breadth-first order.
The returned entries are sorted.
The returned entries are sorted according to the given key.
The default is to sort by name.
"""
entries = []
with os.scandir(path) as s:
@@ -706,7 +710,7 @@ def scandir(path: Union[str, "os.PathLike[str]"]) -> List["os.DirEntry[str]"]:
continue
raise
entries.append(entry)
entries.sort(key=lambda entry: entry.name)
entries.sort(key=sort_key) # type: ignore[arg-type]
return entries

View File

@@ -76,8 +76,7 @@ from _pytest.pathlib import bestrelpath
from _pytest.pathlib import fnmatch_ex
from _pytest.pathlib import import_path
from _pytest.pathlib import ImportPathMismatchError
from _pytest.pathlib import parts
from _pytest.pathlib import visit
from _pytest.pathlib import scandir
from _pytest.scope import _ScopeName
from _pytest.scope import Scope
from _pytest.stash import StashKey
@@ -204,6 +203,16 @@ def pytest_pyfunc_call(pyfuncitem: "Function") -> Optional[object]:
return True
def pytest_collect_directory(
path: Path, parent: nodes.Collector
) -> Optional[nodes.Collector]:
pkginit = path / "__init__.py"
if pkginit.is_file():
pkg: Package = Package.from_parent(parent, path=path)
return pkg
return None
def pytest_collect_file(file_path: Path, parent: nodes.Collector) -> Optional["Module"]:
if file_path.suffix == ".py":
if not parent.session.isinitpath(file_path):
@@ -659,9 +668,20 @@ class Module(nodes.File, PyCollector):
self.obj.__pytest_setup_function = xunit_setup_function_fixture
class Package(nodes.FSCollector):
class Package(nodes.Directory):
"""Collector for files and directories in a Python packages -- directories
with an `__init__.py` file."""
with an `__init__.py` file.
.. note::
Directories without an `__init__.py` file are instead collected by
:class:`~pytest.Dir` by default. Both are :class:`~pytest.Directory`
collectors.
.. versionchanged:: 8.0
Now inherits from :class:`~pytest.Directory`.
"""
def __init__(
self,
@@ -674,10 +694,9 @@ class Package(nodes.FSCollector):
path: Optional[Path] = None,
) -> None:
# NOTE: Could be just the following, but kept as-is for compat.
# nodes.FSCollector.__init__(self, fspath, parent=parent)
# super().__init__(self, fspath, parent=parent)
session = parent.session
nodes.FSCollector.__init__(
self,
super().__init__(
fspath=fspath,
path=path,
parent=parent,
@@ -685,7 +704,6 @@ class Package(nodes.FSCollector):
session=session,
nodeid=nodeid,
)
self.name = self.path.name
def setup(self) -> None:
init_mod = importtestmodule(self.path / "__init__.py", self.config)
@@ -705,66 +723,34 @@ class Package(nodes.FSCollector):
func = partial(_call_with_optional_argument, teardown_module, init_mod)
self.addfinalizer(func)
def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
if direntry.name == "__pycache__":
return False
fspath = Path(direntry.path)
ihook = self.session.gethookproxy(fspath.parent)
if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config):
return False
return True
def _collectfile(
self, fspath: Path, handle_dupes: bool = True
) -> Sequence[nodes.Collector]:
assert (
fspath.is_file()
), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
fspath, fspath.is_dir(), fspath.exists(), fspath.is_symlink()
)
ihook = self.session.gethookproxy(fspath)
if not self.session.isinitpath(fspath):
if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config):
return ()
if handle_dupes:
keepduplicates = self.config.getoption("keepduplicates")
if not keepduplicates:
duplicate_paths = self.config.pluginmanager._duplicatepaths
if fspath in duplicate_paths:
return ()
else:
duplicate_paths.add(fspath)
return ihook.pytest_collect_file(file_path=fspath, parent=self) # type: ignore[no-any-return]
def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]:
# Always collect the __init__ first.
yield from self._collectfile(self.path / "__init__.py")
# Always collect __init__.py first.
def sort_key(entry: "os.DirEntry[str]") -> object:
return (entry.name != "__init__.py", entry.name)
pkg_prefixes: Set[Path] = set()
for direntry in visit(self.path, recurse=self._recurse):
path = Path(direntry.path)
# Already handled above.
if direntry.is_file():
if direntry.name == "__init__.py" and path.parent == self.path:
config = self.config
col: Optional[nodes.Collector]
cols: Sequence[nodes.Collector]
ihook = self.ihook
for direntry in scandir(self.path, sort_key):
if direntry.is_dir():
if direntry.name == "__pycache__":
continue
path = Path(direntry.path)
if not self.session.isinitpath(path, with_parents=True):
if ihook.pytest_ignore_collect(collection_path=path, config=config):
continue
col = ihook.pytest_collect_directory(path=path, parent=self)
if col is not None:
yield col
parts_ = parts(direntry.path)
if any(
str(pkg_prefix) in parts_ and pkg_prefix / "__init__.py" != path
for pkg_prefix in pkg_prefixes
):
continue
if direntry.is_file():
yield from self._collectfile(path)
elif not direntry.is_dir():
# Broken symlink or invalid/missing file.
continue
elif self._recurse(direntry) and path.joinpath("__init__.py").is_file():
pkg_prefixes.add(path)
elif direntry.is_file():
path = Path(direntry.path)
if not self.session.isinitpath(path):
if ihook.pytest_ignore_collect(collection_path=path, config=config):
continue
cols = ihook.pytest_collect_file(file_path=path, parent=self)
yield from cols
def _call_with_optional_argument(func, arg) -> None:

View File

@@ -28,6 +28,7 @@ from _pytest._code.code import TerminalRepr
from _pytest.config.argparsing import Parser
from _pytest.deprecated import check_ispytest
from _pytest.nodes import Collector
from _pytest.nodes import Directory
from _pytest.nodes import Item
from _pytest.nodes import Node
from _pytest.outcomes import Exit
@@ -368,7 +369,23 @@ def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> TestReport:
def pytest_make_collect_report(collector: Collector) -> CollectReport:
call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
def collect() -> List[Union[Item, Collector]]:
# Before collecting, if this is a Directory, load the conftests.
# If a conftest import fails to load, it is considered a collection
# error of the Directory collector. This is why it's done inside of the
# CallInfo wrapper.
#
# Note: initial conftests are loaded early, not here.
if isinstance(collector, Directory):
collector.config.pluginmanager._loadconftestmodules(
collector.path,
collector.config.getoption("importmode"),
rootpath=collector.config.rootpath,
)
return list(collector.collect())
call = CallInfo.from_call(collect, "collect")
longrepr: Union[None, Tuple[str, int, str], str, TerminalRepr] = None
if not call.excinfo:
outcome: Literal["passed", "skipped", "failed"] = "passed"

View File

@@ -30,6 +30,7 @@ from _pytest.freeze_support import freeze_includes
from _pytest.legacypath import TempdirFactory
from _pytest.legacypath import Testdir
from _pytest.logging import LogCaptureFixture
from _pytest.main import Dir
from _pytest.main import Session
from _pytest.mark import Mark
from _pytest.mark import MARK_GEN as mark
@@ -38,6 +39,7 @@ from _pytest.mark import MarkGenerator
from _pytest.mark import param
from _pytest.monkeypatch import MonkeyPatch
from _pytest.nodes import Collector
from _pytest.nodes import Directory
from _pytest.nodes import File
from _pytest.nodes import Item
from _pytest.outcomes import exit
@@ -98,6 +100,8 @@ __all__ = [
"Config",
"console_main",
"deprecated_call",
"Dir",
"Directory",
"DoctestItem",
"exit",
"ExceptionInfo",