diff --git a/src/_pytest/pathlib.py b/src/_pytest/pathlib.py index f45b0bab7..543103fb5 100644 --- a/src/_pytest/pathlib.py +++ b/src/_pytest/pathlib.py @@ -1,7 +1,6 @@ import atexit import fnmatch import itertools -import operator import os import shutil import sys @@ -13,6 +12,11 @@ from os.path import expandvars from os.path import isabs from os.path import sep from posixpath import sep as posix_sep +from typing import Iterable +from typing import Iterator +from typing import Set +from typing import TypeVar +from typing import Union from _pytest.warning_types import PytestWarning @@ -26,10 +30,15 @@ __all__ = ["Path", "PurePath"] LOCK_TIMEOUT = 60 * 60 * 3 -get_lock_path = operator.methodcaller("joinpath", ".lock") + +_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) -def ensure_reset_dir(path): +def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: + return path.joinpath(".lock") + + +def ensure_reset_dir(path: Path) -> None: """ ensures the given path is an empty directory """ @@ -38,7 +47,7 @@ def ensure_reset_dir(path): path.mkdir() -def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool: +def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool: """Handles known read-only errors during rmtree. The returned value is used only by our own tests. @@ -71,7 +80,7 @@ def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool: # Chmod + retry. import stat - def chmod_rw(p: str): + def chmod_rw(p: str) -> None: mode = os.stat(p).st_mode os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) @@ -90,7 +99,7 @@ def on_rm_rf_error(func, path: str, exc, *, start_path) -> bool: return True -def rm_rf(path: Path): +def rm_rf(path: Path) -> None: """Remove the path contents recursively, even if some elements are read-only. """ @@ -98,7 +107,7 @@ def rm_rf(path: Path): shutil.rmtree(str(path), onerror=onerror) -def find_prefixed(root, prefix): +def find_prefixed(root: Path, prefix: str) -> Iterator[Path]: """finds all elements in root that begin with the prefix, case insensitive""" l_prefix = prefix.lower() for x in root.iterdir(): @@ -106,7 +115,7 @@ def find_prefixed(root, prefix): yield x -def extract_suffixes(iter, prefix): +def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]: """ :param iter: iterator over path names :param prefix: expected prefix of the path names @@ -117,13 +126,13 @@ def extract_suffixes(iter, prefix): yield p.name[p_len:] -def find_suffixes(root, prefix): +def find_suffixes(root: Path, prefix: str) -> Iterator[str]: """combines find_prefixes and extract_suffixes """ return extract_suffixes(find_prefixed(root, prefix), prefix) -def parse_num(maybe_num): +def parse_num(maybe_num) -> int: """parses number path suffixes, returns -1 on error""" try: return int(maybe_num) @@ -131,7 +140,9 @@ def parse_num(maybe_num): return -1 -def _force_symlink(root, target, link_to): +def _force_symlink( + root: Path, target: Union[str, PurePath], link_to: Union[str, Path] +) -> None: """helper to create the current symlink it's full of race conditions that are reasonably ok to ignore @@ -151,7 +162,7 @@ def _force_symlink(root, target, link_to): pass -def make_numbered_dir(root, prefix): +def make_numbered_dir(root: Path, prefix: str) -> Path: """create a directory with an increased number as suffix for the given prefix""" for i in range(10): # try up to 10 times to create the folder @@ -172,7 +183,7 @@ def make_numbered_dir(root, prefix): ) -def create_cleanup_lock(p): +def create_cleanup_lock(p: Path) -> Path: """crates a lock to prevent premature folder cleanup""" lock_path = get_lock_path(p) try: @@ -189,11 +200,11 @@ def create_cleanup_lock(p): return lock_path -def register_cleanup_lock_removal(lock_path, register=atexit.register): +def register_cleanup_lock_removal(lock_path: Path, register=atexit.register): """registers a cleanup function for removing a lock, by default on atexit""" pid = os.getpid() - def cleanup_on_exit(lock_path=lock_path, original_pid=pid): + def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: current_pid = os.getpid() if current_pid != original_pid: # fork @@ -206,7 +217,7 @@ def register_cleanup_lock_removal(lock_path, register=atexit.register): return register(cleanup_on_exit) -def maybe_delete_a_numbered_dir(path): +def maybe_delete_a_numbered_dir(path: Path) -> None: """removes a numbered directory if its lock can be obtained and it does not seem to be in use""" lock_path = None try: @@ -232,7 +243,7 @@ def maybe_delete_a_numbered_dir(path): pass -def ensure_deletable(path, consider_lock_dead_if_created_before): +def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: """checks if a lock exists and breaks it if its considered dead""" if path.is_symlink(): return False @@ -251,13 +262,13 @@ def ensure_deletable(path, consider_lock_dead_if_created_before): return False -def try_cleanup(path, consider_lock_dead_if_created_before): +def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: """tries to cleanup a folder if we can ensure it's deletable""" if ensure_deletable(path, consider_lock_dead_if_created_before): maybe_delete_a_numbered_dir(path) -def cleanup_candidates(root, prefix, keep): +def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: """lists candidates for numbered directories to be removed - follows py.path""" max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) max_delete = max_existing - keep @@ -269,7 +280,9 @@ def cleanup_candidates(root, prefix, keep): yield path -def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before): +def cleanup_numbered_dir( + root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float +) -> None: """cleanup for lock driven numbered directories""" for path in cleanup_candidates(root, prefix, keep): try_cleanup(path, consider_lock_dead_if_created_before) @@ -277,7 +290,9 @@ def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_befor try_cleanup(path, consider_lock_dead_if_created_before) -def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout): +def make_numbered_dir_with_cleanup( + root: Path, prefix: str, keep: int, lock_timeout: float +) -> Path: """creates a numbered dir with a cleanup lock and removes old ones""" e = None for i in range(10): @@ -311,7 +326,7 @@ def resolve_from_str(input, root): return root.joinpath(input) -def fnmatch_ex(pattern, path): +def fnmatch_ex(pattern: str, path) -> bool: """FNMatcher port from py.path.common which works with PurePath() instances. The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions @@ -346,6 +361,6 @@ def fnmatch_ex(pattern, path): return fnmatch.fnmatch(name, pattern) -def parts(s): +def parts(s: str) -> Set[str]: parts = s.split(sep) return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}