Merge pull request #5691 from nicoddemus/backport-5627

[4.6] Handle only known functions in rm_rf (#5627)
This commit is contained in:
Anthony Sottile 2019-08-04 19:08:01 -07:00 committed by GitHub
commit a19ae2af22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 44 deletions

View File

@ -11,6 +11,7 @@ import shutil
import sys import sys
import uuid import uuid
import warnings import warnings
from functools import partial
from functools import reduce from functools import reduce
from os.path import expanduser from os.path import expanduser
from os.path import expandvars from os.path import expandvars
@ -46,38 +47,53 @@ def ensure_reset_dir(path):
path.mkdir() path.mkdir()
def on_rm_rf_error(func, path, exc, **kwargs):
"""Handles known read-only errors during rmtree."""
start_path = kwargs["start_path"]
excvalue = exc[1]
if not isinstance(excvalue, OSError) or excvalue.errno not in (
errno.EACCES,
errno.EPERM,
):
warnings.warn(
PytestWarning("(rm_rf) error removing {}: {}".format(path, excvalue))
)
return
if func not in (os.rmdir, os.remove, os.unlink):
warnings.warn(
PytestWarning("(rm_rf) error removing {}: {}".format(path, excvalue))
)
return
# Chmod + retry.
import stat
def chmod_rw(p):
mode = os.stat(p).st_mode
os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
# For files, we need to recursively go upwards in the directories to
# ensure they all are also writable.
p = Path(path)
if p.is_file():
for parent in p.parents:
chmod_rw(str(parent))
# stop when we reach the original path passed to rm_rf
if parent == start_path:
break
chmod_rw(str(path))
func(path)
def rm_rf(path): def rm_rf(path):
"""Remove the path contents recursively, even if some elements """Remove the path contents recursively, even if some elements
are read-only. are read-only.
""" """
onerror = partial(on_rm_rf_error, start_path=path)
def chmod_w(p): shutil.rmtree(str(path), onerror=onerror)
import stat
mode = os.stat(str(p)).st_mode
os.chmod(str(p), mode | stat.S_IWRITE)
def force_writable_and_retry(function, p, excinfo):
p = Path(p)
# for files, we need to recursively go upwards
# in the directories to ensure they all are also
# writable
if p.is_file():
for parent in p.parents:
chmod_w(parent)
# stop when we reach the original path passed to rm_rf
if parent == path:
break
chmod_w(p)
try:
# retry the function that failed
function(str(p))
except Exception as e:
warnings.warn(PytestWarning("(rm_rf) error removing {}: {}".format(p, e)))
shutil.rmtree(str(path), onerror=force_writable_and_retry)
def find_prefixed(root, prefix): def find_prefixed(root, prefix):

View File

@ -3,6 +3,7 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import errno
import os import os
import stat import stat
import sys import sys
@ -319,7 +320,20 @@ class TestNumberedDir(object):
p, consider_lock_dead_if_created_before=p.stat().st_mtime + 1 p, consider_lock_dead_if_created_before=p.stat().st_mtime + 1
) )
def test_rmtree(self, tmp_path): def test_cleanup_ignores_symlink(self, tmp_path):
the_symlink = tmp_path / (self.PREFIX + "current")
attempt_symlink_to(the_symlink, tmp_path / (self.PREFIX + "5"))
self._do_cleanup(tmp_path)
def test_removal_accepts_lock(self, tmp_path):
folder = pathlib.make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
pathlib.create_cleanup_lock(folder)
pathlib.maybe_delete_a_numbered_dir(folder)
assert folder.is_dir()
class TestRmRf:
def test_rm_rf(self, tmp_path):
from _pytest.pathlib import rm_rf from _pytest.pathlib import rm_rf
adir = tmp_path / "adir" adir = tmp_path / "adir"
@ -335,7 +349,7 @@ class TestNumberedDir(object):
rm_rf(adir) rm_rf(adir)
assert not adir.exists() assert not adir.exists()
def test_rmtree_with_read_only_file(self, tmp_path): def test_rm_rf_with_read_only_file(self, tmp_path):
"""Ensure rm_rf can remove directories with read-only files in them (#5524)""" """Ensure rm_rf can remove directories with read-only files in them (#5524)"""
from _pytest.pathlib import rm_rf from _pytest.pathlib import rm_rf
@ -344,14 +358,17 @@ class TestNumberedDir(object):
fn.touch() fn.touch()
mode = os.stat(str(fn)).st_mode self.chmod_r(fn)
os.chmod(str(fn), mode & ~stat.S_IWRITE)
rm_rf(fn.parent) rm_rf(fn.parent)
assert not fn.parent.is_dir() assert not fn.parent.is_dir()
def test_rmtree_with_read_only_directory(self, tmp_path): def chmod_r(self, path):
mode = os.stat(str(path)).st_mode
os.chmod(str(path), mode & ~stat.S_IWRITE)
def test_rm_rf_with_read_only_directory(self, tmp_path):
"""Ensure rm_rf can remove read-only directories (#5524)""" """Ensure rm_rf can remove read-only directories (#5524)"""
from _pytest.pathlib import rm_rf from _pytest.pathlib import rm_rf
@ -359,23 +376,40 @@ class TestNumberedDir(object):
adir.mkdir() adir.mkdir()
(adir / "foo.txt").touch() (adir / "foo.txt").touch()
mode = os.stat(str(adir)).st_mode self.chmod_r(adir)
os.chmod(str(adir), mode & ~stat.S_IWRITE)
rm_rf(adir) rm_rf(adir)
assert not adir.is_dir() assert not adir.is_dir()
def test_cleanup_ignores_symlink(self, tmp_path): def test_on_rm_rf_error(self, tmp_path):
the_symlink = tmp_path / (self.PREFIX + "current") from _pytest.pathlib import on_rm_rf_error
attempt_symlink_to(the_symlink, tmp_path / (self.PREFIX + "5"))
self._do_cleanup(tmp_path)
def test_removal_accepts_lock(self, tmp_path): adir = tmp_path / "dir"
folder = pathlib.make_numbered_dir(root=tmp_path, prefix=self.PREFIX) adir.mkdir()
pathlib.create_cleanup_lock(folder)
pathlib.maybe_delete_a_numbered_dir(folder) fn = adir / "foo.txt"
assert folder.is_dir() fn.touch()
self.chmod_r(fn)
# unknown exception
with pytest.warns(pytest.PytestWarning):
exc_info = (None, RuntimeError(), None)
on_rm_rf_error(os.unlink, str(fn), exc_info, start_path=tmp_path)
assert fn.is_file()
permission_error = OSError()
permission_error.errno = errno.EACCES
# unknown function
with pytest.warns(pytest.PytestWarning):
exc_info = (None, permission_error, None)
on_rm_rf_error(None, str(fn), exc_info, start_path=tmp_path)
assert fn.is_file()
exc_info = (None, permission_error, None)
on_rm_rf_error(os.unlink, str(fn), exc_info, start_path=tmp_path)
assert not fn.is_file()
def attempt_symlink_to(path, to_path): def attempt_symlink_to(path, to_path):