parent
c5fa4da229
commit
4209cce53f
|
@ -1,984 +0,0 @@
|
|||
import atexit
|
||||
import contextlib
|
||||
from enum import Enum
|
||||
from errno import EBADF
|
||||
from errno import ELOOP
|
||||
from errno import ENOENT
|
||||
from errno import ENOTDIR
|
||||
import fnmatch
|
||||
from functools import partial
|
||||
from importlib.machinery import ModuleSpec
|
||||
import importlib.util
|
||||
import itertools
|
||||
import os
|
||||
from os.path import expanduser
|
||||
from os.path import expandvars
|
||||
from os.path import isabs
|
||||
from os.path import sep
|
||||
from pathlib import Path
|
||||
from pathlib import PurePath
|
||||
from posixpath import sep as posix_sep
|
||||
import shutil
|
||||
import sys
|
||||
import types
|
||||
from types import ModuleType
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import Iterator
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
import uuid
|
||||
import warnings
|
||||
|
||||
from _pytest.compat import assert_never
|
||||
from _pytest.outcomes import skip
|
||||
from _pytest.warning_types import PytestWarning
|
||||
|
||||
|
||||
LOCK_TIMEOUT = 60 * 60 * 24 * 3
|
||||
|
||||
|
||||
_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
|
||||
|
||||
# The following function, variables and comments were
|
||||
# copied from cpython 3.9 Lib/pathlib.py file.
|
||||
|
||||
# EBADF - guard against macOS `stat` throwing EBADF
|
||||
_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP)
|
||||
|
||||
_IGNORED_WINERRORS = (
|
||||
21, # ERROR_NOT_READY - drive exists but is not accessible
|
||||
1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself
|
||||
)
|
||||
|
||||
|
||||
def _ignore_error(exception: Exception) -> bool:
|
||||
return (
|
||||
getattr(exception, "errno", None) in _IGNORED_ERRORS
|
||||
or getattr(exception, "winerror", None) in _IGNORED_WINERRORS
|
||||
)
|
||||
|
||||
|
||||
def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
|
||||
return path.joinpath(".lock")
|
||||
|
||||
|
||||
def on_rm_rf_error(
|
||||
func: Optional[Callable[..., Any]],
|
||||
path: str,
|
||||
excinfo: Union[
|
||||
BaseException,
|
||||
Tuple[Type[BaseException], BaseException, Optional[types.TracebackType]],
|
||||
],
|
||||
*,
|
||||
start_path: Path,
|
||||
) -> bool:
|
||||
"""Handle known read-only errors during rmtree.
|
||||
|
||||
The returned value is used only by our own tests.
|
||||
"""
|
||||
if isinstance(excinfo, BaseException):
|
||||
exc = excinfo
|
||||
else:
|
||||
exc = excinfo[1]
|
||||
|
||||
# Another process removed the file in the middle of the "rm_rf" (xdist for example).
|
||||
# More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
|
||||
if isinstance(exc, FileNotFoundError):
|
||||
return False
|
||||
|
||||
if not isinstance(exc, PermissionError):
|
||||
warnings.warn(
|
||||
PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}")
|
||||
)
|
||||
return False
|
||||
|
||||
if func not in (os.rmdir, os.remove, os.unlink):
|
||||
if func not in (os.open,):
|
||||
warnings.warn(
|
||||
PytestWarning(
|
||||
f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}"
|
||||
)
|
||||
)
|
||||
return False
|
||||
|
||||
# Chmod + retry.
|
||||
import stat
|
||||
|
||||
def chmod_rw(p: str) -> None:
|
||||
mode = os.stat(p).st_mode
|
||||
os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
|
||||
|
||||
# For files, we need to recursively go upwards in the directories to
|
||||
# ensure they all are also writable.
|
||||
p = Path(path)
|
||||
if p.is_file():
|
||||
for parent in p.parents:
|
||||
chmod_rw(str(parent))
|
||||
# Stop when we reach the original path passed to rm_rf.
|
||||
if parent == start_path:
|
||||
break
|
||||
chmod_rw(str(path))
|
||||
|
||||
func(path)
|
||||
return True
|
||||
|
||||
|
||||
def ensure_extended_length_path(path: Path) -> Path:
|
||||
"""Get the extended-length version of a path (Windows).
|
||||
|
||||
On Windows, by default, the maximum length of a path (MAX_PATH) is 260
|
||||
characters, and operations on paths longer than that fail. But it is possible
|
||||
to overcome this by converting the path to "extended-length" form before
|
||||
performing the operation:
|
||||
https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation
|
||||
|
||||
On Windows, this function returns the extended-length absolute version of path.
|
||||
On other platforms it returns path unchanged.
|
||||
"""
|
||||
if sys.platform.startswith("win32"):
|
||||
path = path.resolve()
|
||||
path = Path(get_extended_length_path_str(str(path)))
|
||||
return path
|
||||
|
||||
|
||||
def get_extended_length_path_str(path: str) -> str:
|
||||
"""Convert a path to a Windows extended length path."""
|
||||
long_path_prefix = "\\\\?\\"
|
||||
unc_long_path_prefix = "\\\\?\\UNC\\"
|
||||
if path.startswith((long_path_prefix, unc_long_path_prefix)):
|
||||
return path
|
||||
# UNC
|
||||
if path.startswith("\\\\"):
|
||||
return unc_long_path_prefix + path[2:]
|
||||
return long_path_prefix + path
|
||||
|
||||
|
||||
def rm_rf(path: Path) -> None:
|
||||
"""Remove the path contents recursively, even if some elements
|
||||
are read-only."""
|
||||
path = ensure_extended_length_path(path)
|
||||
onerror = partial(on_rm_rf_error, start_path=path)
|
||||
if sys.version_info >= (3, 12):
|
||||
shutil.rmtree(str(path), onexc=onerror)
|
||||
else:
|
||||
shutil.rmtree(str(path), onerror=onerror)
|
||||
|
||||
|
||||
def find_prefixed(root: Path, prefix: str) -> Iterator["os.DirEntry[str]"]:
|
||||
"""Find all elements in root that begin with the prefix, case-insensitive."""
|
||||
l_prefix = prefix.lower()
|
||||
for x in os.scandir(root):
|
||||
if x.name.lower().startswith(l_prefix):
|
||||
yield x
|
||||
|
||||
|
||||
def extract_suffixes(iter: Iterable["os.DirEntry[str]"], prefix: str) -> Iterator[str]:
|
||||
"""Return the parts of the paths following the prefix.
|
||||
|
||||
:param iter: Iterator over path names.
|
||||
:param prefix: Expected prefix of the path names.
|
||||
"""
|
||||
p_len = len(prefix)
|
||||
for entry in iter:
|
||||
yield entry.name[p_len:]
|
||||
|
||||
|
||||
def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
|
||||
"""Combine find_prefixes and extract_suffixes."""
|
||||
return extract_suffixes(find_prefixed(root, prefix), prefix)
|
||||
|
||||
|
||||
def parse_num(maybe_num: str) -> int:
|
||||
"""Parse number path suffixes, returns -1 on error."""
|
||||
try:
|
||||
return int(maybe_num)
|
||||
except ValueError:
|
||||
return -1
|
||||
|
||||
|
||||
def _force_symlink(
|
||||
root: Path, target: Union[str, PurePath], link_to: Union[str, Path]
|
||||
) -> None:
|
||||
"""Helper to create the current symlink.
|
||||
|
||||
It's full of race conditions that are reasonably OK to ignore
|
||||
for the context of best effort linking to the latest test run.
|
||||
|
||||
The presumption being that in case of much parallelism
|
||||
the inaccuracy is going to be acceptable.
|
||||
"""
|
||||
current_symlink = root.joinpath(target)
|
||||
try:
|
||||
current_symlink.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
current_symlink.symlink_to(link_to)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path:
|
||||
"""Create a directory with an increased number as suffix for the given prefix."""
|
||||
for i in range(10):
|
||||
# try up to 10 times to create the folder
|
||||
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
|
||||
new_number = max_existing + 1
|
||||
new_path = root.joinpath(f"{prefix}{new_number}")
|
||||
try:
|
||||
new_path.mkdir(mode=mode)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
_force_symlink(root, prefix + "current", new_path)
|
||||
return new_path
|
||||
else:
|
||||
raise OSError(
|
||||
"could not create numbered dir with prefix "
|
||||
f"{prefix} in {root} after 10 tries"
|
||||
)
|
||||
|
||||
|
||||
def create_cleanup_lock(p: Path) -> Path:
|
||||
"""Create a lock to prevent premature folder cleanup."""
|
||||
lock_path = get_lock_path(p)
|
||||
try:
|
||||
fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
|
||||
except FileExistsError as e:
|
||||
raise OSError(f"cannot create lockfile in {p}") from e
|
||||
else:
|
||||
pid = os.getpid()
|
||||
spid = str(pid).encode()
|
||||
os.write(fd, spid)
|
||||
os.close(fd)
|
||||
if not lock_path.is_file():
|
||||
raise OSError("lock path got renamed after successful creation")
|
||||
return lock_path
|
||||
|
||||
|
||||
def register_cleanup_lock_removal(
|
||||
lock_path: Path, register: Any = atexit.register
|
||||
) -> Any:
|
||||
"""Register a cleanup function for removing a lock, by default on atexit."""
|
||||
pid = os.getpid()
|
||||
|
||||
def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
|
||||
current_pid = os.getpid()
|
||||
if current_pid != original_pid:
|
||||
# fork
|
||||
return
|
||||
try:
|
||||
lock_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return register(cleanup_on_exit)
|
||||
|
||||
|
||||
def maybe_delete_a_numbered_dir(path: Path) -> None:
|
||||
"""Remove a numbered directory if its lock can be obtained and it does
|
||||
not seem to be in use."""
|
||||
path = ensure_extended_length_path(path)
|
||||
lock_path = None
|
||||
try:
|
||||
lock_path = create_cleanup_lock(path)
|
||||
parent = path.parent
|
||||
|
||||
garbage = parent.joinpath(f"garbage-{uuid.uuid4()}")
|
||||
path.rename(garbage)
|
||||
rm_rf(garbage)
|
||||
except OSError:
|
||||
# known races:
|
||||
# * other process did a cleanup at the same time
|
||||
# * deletable folder was found
|
||||
# * process cwd (Windows)
|
||||
return
|
||||
finally:
|
||||
# If we created the lock, ensure we remove it even if we failed
|
||||
# to properly remove the numbered dir.
|
||||
if lock_path is not None:
|
||||
try:
|
||||
lock_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
|
||||
"""Check if `path` is deletable based on whether the lock file is expired."""
|
||||
if path.is_symlink():
|
||||
return False
|
||||
lock = get_lock_path(path)
|
||||
try:
|
||||
if not lock.is_file():
|
||||
return True
|
||||
except OSError:
|
||||
# we might not have access to the lock file at all, in this case assume
|
||||
# we don't have access to the entire directory (#7491).
|
||||
return False
|
||||
try:
|
||||
lock_time = lock.stat().st_mtime
|
||||
except Exception:
|
||||
return False
|
||||
else:
|
||||
if lock_time < consider_lock_dead_if_created_before:
|
||||
# We want to ignore any errors while trying to remove the lock such as:
|
||||
# - PermissionDenied, like the file permissions have changed since the lock creation;
|
||||
# - FileNotFoundError, in case another pytest process got here first;
|
||||
# and any other cause of failure.
|
||||
with contextlib.suppress(OSError):
|
||||
lock.unlink()
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
|
||||
"""Try to cleanup a folder if we can ensure it's deletable."""
|
||||
if ensure_deletable(path, consider_lock_dead_if_created_before):
|
||||
maybe_delete_a_numbered_dir(path)
|
||||
|
||||
|
||||
def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
|
||||
"""List candidates for numbered directories to be removed - follows py.path."""
|
||||
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
|
||||
max_delete = max_existing - keep
|
||||
entries = find_prefixed(root, prefix)
|
||||
entries, entries2 = itertools.tee(entries)
|
||||
numbers = map(parse_num, extract_suffixes(entries2, prefix))
|
||||
for entry, number in zip(entries, numbers):
|
||||
if number <= max_delete:
|
||||
yield Path(entry)
|
||||
|
||||
|
||||
def cleanup_dead_symlinks(root: Path) -> None:
|
||||
for left_dir in root.iterdir():
|
||||
if left_dir.is_symlink():
|
||||
if not left_dir.resolve().exists():
|
||||
left_dir.unlink()
|
||||
|
||||
|
||||
def cleanup_numbered_dir(
|
||||
root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
|
||||
) -> None:
|
||||
"""Cleanup for lock driven numbered directories."""
|
||||
if not root.exists():
|
||||
return
|
||||
for path in cleanup_candidates(root, prefix, keep):
|
||||
try_cleanup(path, consider_lock_dead_if_created_before)
|
||||
for path in root.glob("garbage-*"):
|
||||
try_cleanup(path, consider_lock_dead_if_created_before)
|
||||
|
||||
cleanup_dead_symlinks(root)
|
||||
|
||||
|
||||
def make_numbered_dir_with_cleanup(
|
||||
root: Path,
|
||||
prefix: str,
|
||||
keep: int,
|
||||
lock_timeout: float,
|
||||
mode: int,
|
||||
) -> Path:
|
||||
"""Create a numbered dir with a cleanup lock and remove old ones."""
|
||||
e = None
|
||||
for i in range(10):
|
||||
try:
|
||||
p = make_numbered_dir(root, prefix, mode)
|
||||
# Only lock the current dir when keep is not 0
|
||||
if keep != 0:
|
||||
lock_path = create_cleanup_lock(p)
|
||||
register_cleanup_lock_removal(lock_path)
|
||||
except Exception as exc:
|
||||
e = exc
|
||||
else:
|
||||
consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
|
||||
# Register a cleanup for program exit
|
||||
atexit.register(
|
||||
cleanup_numbered_dir,
|
||||
root,
|
||||
prefix,
|
||||
keep,
|
||||
consider_lock_dead_if_created_before,
|
||||
)
|
||||
return p
|
||||
assert e is not None
|
||||
raise e
|
||||
|
||||
|
||||
def resolve_from_str(input: str, rootpath: Path) -> Path:
|
||||
input = expanduser(input)
|
||||
input = expandvars(input)
|
||||
if isabs(input):
|
||||
return Path(input)
|
||||
else:
|
||||
return rootpath.joinpath(input)
|
||||
|
||||
|
||||
def fnmatch_ex(pattern: str, path: Union[str, "os.PathLike[str]"]) -> bool:
|
||||
"""A port of FNMatcher from py.path.common which works with PurePath() instances.
|
||||
|
||||
The difference between this algorithm and PurePath.match() is that the
|
||||
latter matches "**" glob expressions for each part of the path, while
|
||||
this algorithm uses the whole path instead.
|
||||
|
||||
For example:
|
||||
"tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py"
|
||||
with this algorithm, but not with PurePath.match().
|
||||
|
||||
This algorithm was ported to keep backward-compatibility with existing
|
||||
settings which assume paths match according this logic.
|
||||
|
||||
References:
|
||||
* https://bugs.python.org/issue29249
|
||||
* https://bugs.python.org/issue34731
|
||||
"""
|
||||
path = PurePath(path)
|
||||
iswin32 = sys.platform.startswith("win")
|
||||
|
||||
if iswin32 and sep not in pattern and posix_sep in pattern:
|
||||
# Running on Windows, the pattern has no Windows path separators,
|
||||
# and the pattern has one or more Posix path separators. Replace
|
||||
# the Posix path separators with the Windows path separator.
|
||||
pattern = pattern.replace(posix_sep, sep)
|
||||
|
||||
if sep not in pattern:
|
||||
name = path.name
|
||||
else:
|
||||
name = str(path)
|
||||
if path.is_absolute() and not os.path.isabs(pattern):
|
||||
pattern = f"*{os.sep}{pattern}"
|
||||
return fnmatch.fnmatch(name, pattern)
|
||||
|
||||
|
||||
def parts(s: str) -> Set[str]:
|
||||
parts = s.split(sep)
|
||||
return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
|
||||
|
||||
|
||||
def symlink_or_skip(
|
||||
src: Union["os.PathLike[str]", str],
|
||||
dst: Union["os.PathLike[str]", str],
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Make a symlink, or skip the test in case symlinks are not supported."""
|
||||
try:
|
||||
os.symlink(src, dst, **kwargs)
|
||||
except OSError as e:
|
||||
skip(f"symlinks not supported: {e}")
|
||||
|
||||
|
||||
class ImportMode(Enum):
|
||||
"""Possible values for `mode` parameter of `import_path`."""
|
||||
|
||||
prepend = "prepend"
|
||||
append = "append"
|
||||
importlib = "importlib"
|
||||
|
||||
|
||||
class ImportPathMismatchError(ImportError):
|
||||
"""Raised on import_path() if there is a mismatch of __file__'s.
|
||||
|
||||
This can happen when `import_path` is called multiple times with different filenames that has
|
||||
the same basename but reside in packages
|
||||
(for example "/tests1/test_foo.py" and "/tests2/test_foo.py").
|
||||
"""
|
||||
|
||||
|
||||
def import_path(
|
||||
path: Union[str, "os.PathLike[str]"],
|
||||
*,
|
||||
mode: Union[str, ImportMode] = ImportMode.prepend,
|
||||
root: Path,
|
||||
consider_namespace_packages: bool,
|
||||
) -> ModuleType:
|
||||
"""
|
||||
Import and return a module from the given path, which can be a file (a module) or
|
||||
a directory (a package).
|
||||
|
||||
:param path:
|
||||
Path to the file to import.
|
||||
|
||||
:param mode:
|
||||
Controls the underlying import mechanism that will be used:
|
||||
|
||||
* ImportMode.prepend: the directory containing the module (or package, taking
|
||||
`__init__.py` files into account) will be put at the *start* of `sys.path` before
|
||||
being imported with `importlib.import_module`.
|
||||
|
||||
* ImportMode.append: same as `prepend`, but the directory will be appended
|
||||
to the end of `sys.path`, if not already in `sys.path`.
|
||||
|
||||
* ImportMode.importlib: uses more fine control mechanisms provided by `importlib`
|
||||
to import the module, which avoids having to muck with `sys.path` at all. It effectively
|
||||
allows having same-named test modules in different places.
|
||||
|
||||
:param root:
|
||||
Used as an anchor when mode == ImportMode.importlib to obtain
|
||||
a unique name for the module being imported so it can safely be stored
|
||||
into ``sys.modules``.
|
||||
|
||||
:param consider_namespace_packages:
|
||||
If True, consider namespace packages when resolving module names.
|
||||
|
||||
:raises ImportPathMismatchError:
|
||||
If after importing the given `path` and the module `__file__`
|
||||
are different. Only raised in `prepend` and `append` modes.
|
||||
"""
|
||||
path = Path(path)
|
||||
mode = ImportMode(mode)
|
||||
|
||||
if not path.exists():
|
||||
raise ImportError(path)
|
||||
|
||||
if mode is ImportMode.importlib:
|
||||
# Try to import this module using the standard import mechanisms, but
|
||||
# without touching sys.path.
|
||||
try:
|
||||
pkg_root, module_name = resolve_pkg_root_and_module_name(
|
||||
path, consider_namespace_packages=consider_namespace_packages
|
||||
)
|
||||
except CouldNotResolvePathError:
|
||||
pass
|
||||
else:
|
||||
# If the given module name is already in sys.modules, do not import it again.
|
||||
with contextlib.suppress(KeyError):
|
||||
return sys.modules[module_name]
|
||||
|
||||
mod = _import_module_using_spec(
|
||||
module_name, path, pkg_root, insert_modules=False
|
||||
)
|
||||
if mod is not None:
|
||||
return mod
|
||||
|
||||
# Could not import the module with the current sys.path, so we fall back
|
||||
# to importing the file as a single module, not being a part of a package.
|
||||
module_name = module_name_from_path(path, root)
|
||||
with contextlib.suppress(KeyError):
|
||||
return sys.modules[module_name]
|
||||
|
||||
mod = _import_module_using_spec(
|
||||
module_name, path, path.parent, insert_modules=True
|
||||
)
|
||||
if mod is None:
|
||||
raise ImportError(f"Can't find module {module_name} at location {path}")
|
||||
return mod
|
||||
|
||||
try:
|
||||
pkg_root, module_name = resolve_pkg_root_and_module_name(
|
||||
path, consider_namespace_packages=consider_namespace_packages
|
||||
)
|
||||
except CouldNotResolvePathError:
|
||||
pkg_root, module_name = path.parent, path.stem
|
||||
|
||||
# Change sys.path permanently: restoring it at the end of this function would cause surprising
|
||||
# problems because of delayed imports: for example, a conftest.py file imported by this function
|
||||
# might have local imports, which would fail at runtime if we restored sys.path.
|
||||
if mode is ImportMode.append:
|
||||
if str(pkg_root) not in sys.path:
|
||||
sys.path.append(str(pkg_root))
|
||||
elif mode is ImportMode.prepend:
|
||||
if str(pkg_root) != sys.path[0]:
|
||||
sys.path.insert(0, str(pkg_root))
|
||||
else:
|
||||
assert_never(mode)
|
||||
|
||||
importlib.import_module(module_name)
|
||||
|
||||
mod = sys.modules[module_name]
|
||||
if path.name == "__init__.py":
|
||||
return mod
|
||||
|
||||
ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "")
|
||||
if ignore != "1":
|
||||
module_file = mod.__file__
|
||||
if module_file is None:
|
||||
raise ImportPathMismatchError(module_name, module_file, path)
|
||||
|
||||
if module_file.endswith((".pyc", ".pyo")):
|
||||
module_file = module_file[:-1]
|
||||
if module_file.endswith(os.sep + "__init__.py"):
|
||||
module_file = module_file[: -(len(os.sep + "__init__.py"))]
|
||||
|
||||
try:
|
||||
is_same = _is_same(str(path), module_file)
|
||||
except FileNotFoundError:
|
||||
is_same = False
|
||||
|
||||
if not is_same:
|
||||
raise ImportPathMismatchError(module_name, module_file, path)
|
||||
|
||||
return mod
|
||||
|
||||
|
||||
def _import_module_using_spec(
|
||||
module_name: str, module_path: Path, module_location: Path, *, insert_modules: bool
|
||||
) -> Optional[ModuleType]:
|
||||
"""
|
||||
Tries to import a module by its canonical name, path to the .py file, and its
|
||||
parent location.
|
||||
|
||||
:param insert_modules:
|
||||
If True, will call insert_missing_modules to create empty intermediate modules
|
||||
for made-up module names (when importing test files not reachable from sys.path).
|
||||
"""
|
||||
# Checking with sys.meta_path first in case one of its hooks can import this module,
|
||||
# such as our own assertion-rewrite hook.
|
||||
for meta_importer in sys.meta_path:
|
||||
spec = meta_importer.find_spec(module_name, [str(module_location)])
|
||||
if spec_matches_module_path(spec, module_path):
|
||||
break
|
||||
else:
|
||||
spec = importlib.util.spec_from_file_location(module_name, str(module_path))
|
||||
|
||||
if spec_matches_module_path(spec, module_path):
|
||||
assert spec is not None
|
||||
# Attempt to import the parent module, seems is our responsibility:
|
||||
# https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311
|
||||
parent_module_name, _, name = module_name.rpartition(".")
|
||||
parent_module: Optional[ModuleType] = None
|
||||
if parent_module_name:
|
||||
parent_module = sys.modules.get(parent_module_name)
|
||||
if parent_module is None:
|
||||
# Find the directory of this module's parent.
|
||||
parent_dir = (
|
||||
module_path.parent.parent
|
||||
if module_path.name == "__init__.py"
|
||||
else module_path.parent
|
||||
)
|
||||
# Consider the parent module path as its __init__.py file, if it has one.
|
||||
parent_module_path = (
|
||||
parent_dir / "__init__.py"
|
||||
if (parent_dir / "__init__.py").is_file()
|
||||
else parent_dir
|
||||
)
|
||||
parent_module = _import_module_using_spec(
|
||||
parent_module_name,
|
||||
parent_module_path,
|
||||
parent_dir,
|
||||
insert_modules=insert_modules,
|
||||
)
|
||||
|
||||
# Find spec and import this module.
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = mod
|
||||
spec.loader.exec_module(mod) # type: ignore[union-attr]
|
||||
|
||||
# Set this module as an attribute of the parent module (#12194).
|
||||
if parent_module is not None:
|
||||
setattr(parent_module, name, mod)
|
||||
|
||||
if insert_modules:
|
||||
insert_missing_modules(sys.modules, module_name)
|
||||
return mod
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def spec_matches_module_path(
|
||||
module_spec: Optional[ModuleSpec], module_path: Path
|
||||
) -> bool:
|
||||
"""Return true if the given ModuleSpec can be used to import the given module path."""
|
||||
if module_spec is None or module_spec.origin is None:
|
||||
return False
|
||||
|
||||
return Path(module_spec.origin) == module_path
|
||||
|
||||
|
||||
# Implement a special _is_same function on Windows which returns True if the two filenames
|
||||
# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678).
|
||||
if sys.platform.startswith("win"):
|
||||
|
||||
def _is_same(f1: str, f2: str) -> bool:
|
||||
return Path(f1) == Path(f2) or os.path.samefile(f1, f2)
|
||||
|
||||
else:
|
||||
|
||||
def _is_same(f1: str, f2: str) -> bool:
|
||||
return os.path.samefile(f1, f2)
|
||||
|
||||
|
||||
def module_name_from_path(path: Path, root: Path) -> str:
|
||||
"""
|
||||
Return a dotted module name based on the given path, anchored on root.
|
||||
|
||||
For example: path="projects/src/tests/test_foo.py" and root="/projects", the
|
||||
resulting module name will be "src.tests.test_foo".
|
||||
"""
|
||||
path = path.with_suffix("")
|
||||
try:
|
||||
relative_path = path.relative_to(root)
|
||||
except ValueError:
|
||||
# If we can't get a relative path to root, use the full path, except
|
||||
# for the first part ("d:\\" or "/" depending on the platform, for example).
|
||||
path_parts = path.parts[1:]
|
||||
else:
|
||||
# Use the parts for the relative path to the root path.
|
||||
path_parts = relative_path.parts
|
||||
|
||||
# Module name for packages do not contain the __init__ file, unless
|
||||
# the `__init__.py` file is at the root.
|
||||
if len(path_parts) >= 2 and path_parts[-1] == "__init__":
|
||||
path_parts = path_parts[:-1]
|
||||
|
||||
# Module names cannot contain ".", normalize them to "_". This prevents
|
||||
# a directory having a "." in the name (".env.310" for example) causing extra intermediate modules.
|
||||
# Also, important to replace "." at the start of paths, as those are considered relative imports.
|
||||
path_parts = tuple(x.replace(".", "_") for x in path_parts)
|
||||
|
||||
return ".".join(path_parts)
|
||||
|
||||
|
||||
def insert_missing_modules(modules: Dict[str, ModuleType], module_name: str) -> None:
|
||||
"""
|
||||
Used by ``import_path`` to create intermediate modules when using mode=importlib.
|
||||
|
||||
When we want to import a module as "src.tests.test_foo" for example, we need
|
||||
to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo",
|
||||
otherwise "src.tests.test_foo" is not importable by ``__import__``.
|
||||
"""
|
||||
module_parts = module_name.split(".")
|
||||
while module_name:
|
||||
parent_module_name, _, child_name = module_name.rpartition(".")
|
||||
if parent_module_name:
|
||||
parent_module = modules.get(parent_module_name)
|
||||
if parent_module is None:
|
||||
try:
|
||||
# If sys.meta_path is empty, calling import_module will issue
|
||||
# a warning and raise ModuleNotFoundError. To avoid the
|
||||
# warning, we check sys.meta_path explicitly and raise the error
|
||||
# ourselves to fall back to creating a dummy module.
|
||||
if not sys.meta_path:
|
||||
raise ModuleNotFoundError
|
||||
parent_module = importlib.import_module(parent_module_name)
|
||||
except ModuleNotFoundError:
|
||||
parent_module = ModuleType(
|
||||
module_name,
|
||||
doc="Empty module created by pytest's importmode=importlib.",
|
||||
)
|
||||
modules[parent_module_name] = parent_module
|
||||
|
||||
# Add child attribute to the parent that can reference the child
|
||||
# modules.
|
||||
if not hasattr(parent_module, child_name):
|
||||
setattr(parent_module, child_name, modules[module_name])
|
||||
|
||||
module_parts.pop(-1)
|
||||
module_name = ".".join(module_parts)
|
||||
|
||||
|
||||
def resolve_package_path(path: Path) -> Optional[Path]:
|
||||
"""Return the Python package path by looking for the last
|
||||
directory upwards which still contains an __init__.py.
|
||||
|
||||
Returns None if it cannot be determined.
|
||||
"""
|
||||
result = None
|
||||
for parent in itertools.chain((path,), path.parents):
|
||||
if parent.is_dir():
|
||||
if not (parent / "__init__.py").is_file():
|
||||
break
|
||||
if not parent.name.isidentifier():
|
||||
break
|
||||
result = parent
|
||||
return result
|
||||
|
||||
|
||||
def resolve_pkg_root_and_module_name(
|
||||
path: Path, *, consider_namespace_packages: bool = False
|
||||
) -> Tuple[Path, str]:
|
||||
"""
|
||||
Return the path to the directory of the root package that contains the
|
||||
given Python file, and its module name:
|
||||
|
||||
src/
|
||||
app/
|
||||
__init__.py
|
||||
core/
|
||||
__init__.py
|
||||
models.py
|
||||
|
||||
Passing the full path to `models.py` will yield Path("src") and "app.core.models".
|
||||
|
||||
If consider_namespace_packages is True, then we additionally check upwards in the hierarchy
|
||||
for namespace packages:
|
||||
|
||||
https://packaging.python.org/en/latest/guides/packaging-namespace-packages
|
||||
|
||||
Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files).
|
||||
"""
|
||||
pkg_root: Optional[Path] = None
|
||||
pkg_path = resolve_package_path(path)
|
||||
if pkg_path is not None:
|
||||
pkg_root = pkg_path.parent
|
||||
if consider_namespace_packages:
|
||||
start = pkg_root if pkg_root is not None else path.parent
|
||||
for candidate in (start, *start.parents):
|
||||
module_name = compute_module_name(candidate, path)
|
||||
if module_name and is_importable(module_name, path):
|
||||
# Point the pkg_root to the root of the namespace package.
|
||||
pkg_root = candidate
|
||||
break
|
||||
|
||||
if pkg_root is not None:
|
||||
module_name = compute_module_name(pkg_root, path)
|
||||
if module_name:
|
||||
return pkg_root, module_name
|
||||
|
||||
raise CouldNotResolvePathError(f"Could not resolve for {path}")
|
||||
|
||||
|
||||
def is_importable(module_name: str, module_path: Path) -> bool:
|
||||
"""
|
||||
Return if the given module path could be imported normally by Python, akin to the user
|
||||
entering the REPL and importing the corresponding module name directly, and corresponds
|
||||
to the module_path specified.
|
||||
|
||||
:param module_name:
|
||||
Full module name that we want to check if is importable.
|
||||
For example, "app.models".
|
||||
|
||||
:param module_path:
|
||||
Full path to the python module/package we want to check if is importable.
|
||||
For example, "/projects/src/app/models.py".
|
||||
"""
|
||||
try:
|
||||
# Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through
|
||||
# sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``).
|
||||
# Using importlib.util.find_spec() is different, it gives the same results as trying to import
|
||||
# the module normally in the REPL.
|
||||
spec = importlib.util.find_spec(module_name)
|
||||
except (ImportError, ValueError, ImportWarning):
|
||||
return False
|
||||
else:
|
||||
return spec_matches_module_path(spec, module_path)
|
||||
|
||||
|
||||
def compute_module_name(root: Path, module_path: Path) -> Optional[str]:
|
||||
"""Compute a module name based on a path and a root anchor."""
|
||||
try:
|
||||
path_without_suffix = module_path.with_suffix("")
|
||||
except ValueError:
|
||||
# Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter).
|
||||
return None
|
||||
|
||||
try:
|
||||
relative = path_without_suffix.relative_to(root)
|
||||
except ValueError: # pragma: no cover
|
||||
return None
|
||||
names = list(relative.parts)
|
||||
if not names:
|
||||
return None
|
||||
if names[-1] == "__init__":
|
||||
names.pop()
|
||||
return ".".join(names)
|
||||
|
||||
|
||||
class CouldNotResolvePathError(Exception):
|
||||
"""Custom exception raised by resolve_pkg_root_and_module_name."""
|
||||
|
||||
|
||||
def scandir(
|
||||
path: Union[str, "os.PathLike[str]"],
|
||||
sort_key: Callable[["os.DirEntry[str]"], object] = lambda entry: entry.name,
|
||||
) -> List["os.DirEntry[str]"]:
|
||||
"""Scan a directory recursively, in breadth-first order.
|
||||
|
||||
The returned entries are sorted according to the given key.
|
||||
The default is to sort by name.
|
||||
"""
|
||||
entries = []
|
||||
with os.scandir(path) as s:
|
||||
# Skip entries with symlink loops and other brokenness, so the caller
|
||||
# doesn't have to deal with it.
|
||||
for entry in s:
|
||||
try:
|
||||
entry.is_file()
|
||||
except OSError as err:
|
||||
if _ignore_error(err):
|
||||
continue
|
||||
raise
|
||||
entries.append(entry)
|
||||
entries.sort(key=sort_key) # type: ignore[arg-type]
|
||||
return entries
|
||||
|
||||
|
||||
def visit(
|
||||
path: Union[str, "os.PathLike[str]"], recurse: Callable[["os.DirEntry[str]"], bool]
|
||||
) -> Iterator["os.DirEntry[str]"]:
|
||||
"""Walk a directory recursively, in breadth-first order.
|
||||
|
||||
The `recurse` predicate determines whether a directory is recursed.
|
||||
|
||||
Entries at each directory level are sorted.
|
||||
"""
|
||||
entries = scandir(path)
|
||||
yield from entries
|
||||
for entry in entries:
|
||||
if entry.is_dir() and recurse(entry):
|
||||
yield from visit(entry.path, recurse)
|
||||
|
||||
|
||||
def absolutepath(path: "Union[str, os.PathLike[str]]") -> Path:
|
||||
"""Convert a path to an absolute path using os.path.abspath.
|
||||
|
||||
Prefer this over Path.resolve() (see #6523).
|
||||
Prefer this over Path.absolute() (not public, doesn't normalize).
|
||||
"""
|
||||
return Path(os.path.abspath(path))
|
||||
|
||||
|
||||
def commonpath(path1: Path, path2: Path) -> Optional[Path]:
|
||||
"""Return the common part shared with the other path, or None if there is
|
||||
no common part.
|
||||
|
||||
If one path is relative and one is absolute, returns None.
|
||||
"""
|
||||
try:
|
||||
return Path(os.path.commonpath((str(path1), str(path2))))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def bestrelpath(directory: Path, dest: Path) -> str:
|
||||
"""Return a string which is a relative path from directory to dest such
|
||||
that directory/bestrelpath == dest.
|
||||
|
||||
The paths must be either both absolute or both relative.
|
||||
|
||||
If no such path can be determined, returns dest.
|
||||
"""
|
||||
assert isinstance(directory, Path)
|
||||
assert isinstance(dest, Path)
|
||||
if dest == directory:
|
||||
return os.curdir
|
||||
# Find the longest common directory.
|
||||
base = commonpath(directory, dest)
|
||||
# Can be the case on Windows for two absolute paths on different drives.
|
||||
# Can be the case for two relative paths without common prefix.
|
||||
# Can be the case for a relative path and an absolute path.
|
||||
if not base:
|
||||
return str(dest)
|
||||
reldirectory = directory.relative_to(base)
|
||||
reldest = dest.relative_to(base)
|
||||
return os.path.join(
|
||||
# Back from directory to base.
|
||||
*([os.pardir] * len(reldirectory.parts)),
|
||||
# Forward from base to dest.
|
||||
*reldest.parts,
|
||||
)
|
||||
|
||||
|
||||
def safe_exists(p: Path) -> bool:
|
||||
"""Like Path.exists(), but account for input arguments that might be too long (#11394)."""
|
||||
try:
|
||||
return p.exists()
|
||||
except (ValueError, OSError):
|
||||
# ValueError: stat: path too long for Windows
|
||||
# OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect
|
||||
return False
|
Loading…
Reference in New Issue