Handle only known functions in rm_rf (#5627)
Handle only known functions in rm_rf
This commit is contained in:
		
						commit
						0d3958e8de
					
				|  | @ -8,6 +8,7 @@ import shutil | ||||||
| import sys | import sys | ||||||
| import uuid | import uuid | ||||||
| import warnings | import warnings | ||||||
|  | from functools import partial | ||||||
| from os.path import expanduser | from os.path import expanduser | ||||||
| from os.path import expandvars | from os.path import expandvars | ||||||
| from os.path import isabs | from os.path import isabs | ||||||
|  | @ -38,38 +39,49 @@ def ensure_reset_dir(path): | ||||||
|     path.mkdir() |     path.mkdir() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def rm_rf(path): | def on_rm_rf_error(func, path: str, exc, *, start_path): | ||||||
|  |     """Handles known read-only errors during rmtree.""" | ||||||
|  |     excvalue = exc[1] | ||||||
|  | 
 | ||||||
|  |     if not isinstance(excvalue, PermissionError): | ||||||
|  |         warnings.warn( | ||||||
|  |             PytestWarning("(rm_rf) error removing {}: {}".format(path, excvalue)) | ||||||
|  |         ) | ||||||
|  |         return | ||||||
|  | 
 | ||||||
|  |     if func not in (os.rmdir, os.remove, os.unlink): | ||||||
|  |         warnings.warn( | ||||||
|  |             PytestWarning("(rm_rf) error removing {}: {}".format(path, excvalue)) | ||||||
|  |         ) | ||||||
|  |         return | ||||||
|  | 
 | ||||||
|  |     # Chmod + retry. | ||||||
|  |     import stat | ||||||
|  | 
 | ||||||
|  |     def chmod_rw(p: str): | ||||||
|  |         mode = os.stat(p).st_mode | ||||||
|  |         os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) | ||||||
|  | 
 | ||||||
|  |     # For files, we need to recursively go upwards in the directories to | ||||||
|  |     # ensure they all are also writable. | ||||||
|  |     p = Path(path) | ||||||
|  |     if p.is_file(): | ||||||
|  |         for parent in p.parents: | ||||||
|  |             chmod_rw(str(parent)) | ||||||
|  |             # stop when we reach the original path passed to rm_rf | ||||||
|  |             if parent == start_path: | ||||||
|  |                 break | ||||||
|  |     chmod_rw(str(path)) | ||||||
|  | 
 | ||||||
|  |     func(path) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def rm_rf(path: Path): | ||||||
|     """Remove the path contents recursively, even if some elements |     """Remove the path contents recursively, even if some elements | ||||||
|     are read-only. |     are read-only. | ||||||
|     """ |     """ | ||||||
| 
 |     onerror = partial(on_rm_rf_error, start_path=path) | ||||||
|     def chmod_w(p): |     shutil.rmtree(str(path), onerror=onerror) | ||||||
|         import stat |  | ||||||
| 
 |  | ||||||
|         mode = os.stat(str(p)).st_mode |  | ||||||
|         os.chmod(str(p), mode | stat.S_IWRITE) |  | ||||||
| 
 |  | ||||||
|     def force_writable_and_retry(function, p, excinfo): |  | ||||||
|         p = Path(p) |  | ||||||
| 
 |  | ||||||
|         # for files, we need to recursively go upwards |  | ||||||
|         # in the directories to ensure they all are also |  | ||||||
|         # writable |  | ||||||
|         if p.is_file(): |  | ||||||
|             for parent in p.parents: |  | ||||||
|                 chmod_w(parent) |  | ||||||
|                 # stop when we reach the original path passed to rm_rf |  | ||||||
|                 if parent == path: |  | ||||||
|                     break |  | ||||||
| 
 |  | ||||||
|         chmod_w(p) |  | ||||||
|         try: |  | ||||||
|             # retry the function that failed |  | ||||||
|             function(str(p)) |  | ||||||
|         except Exception as e: |  | ||||||
|             warnings.warn(PytestWarning("(rm_rf) error removing {}: {}".format(p, e))) |  | ||||||
| 
 |  | ||||||
|     shutil.rmtree(str(path), onerror=force_writable_and_retry) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def find_prefixed(root, prefix): | def find_prefixed(root, prefix): | ||||||
|  |  | ||||||
|  | @ -312,7 +312,20 @@ class TestNumberedDir: | ||||||
|             p, consider_lock_dead_if_created_before=p.stat().st_mtime + 1 |             p, consider_lock_dead_if_created_before=p.stat().st_mtime + 1 | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     def test_rmtree(self, tmp_path): |     def test_cleanup_ignores_symlink(self, tmp_path): | ||||||
|  |         the_symlink = tmp_path / (self.PREFIX + "current") | ||||||
|  |         attempt_symlink_to(the_symlink, tmp_path / (self.PREFIX + "5")) | ||||||
|  |         self._do_cleanup(tmp_path) | ||||||
|  | 
 | ||||||
|  |     def test_removal_accepts_lock(self, tmp_path): | ||||||
|  |         folder = pathlib.make_numbered_dir(root=tmp_path, prefix=self.PREFIX) | ||||||
|  |         pathlib.create_cleanup_lock(folder) | ||||||
|  |         pathlib.maybe_delete_a_numbered_dir(folder) | ||||||
|  |         assert folder.is_dir() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class TestRmRf: | ||||||
|  |     def test_rm_rf(self, tmp_path): | ||||||
|         from _pytest.pathlib import rm_rf |         from _pytest.pathlib import rm_rf | ||||||
| 
 | 
 | ||||||
|         adir = tmp_path / "adir" |         adir = tmp_path / "adir" | ||||||
|  | @ -328,7 +341,7 @@ class TestNumberedDir: | ||||||
|         rm_rf(adir) |         rm_rf(adir) | ||||||
|         assert not adir.exists() |         assert not adir.exists() | ||||||
| 
 | 
 | ||||||
|     def test_rmtree_with_read_only_file(self, tmp_path): |     def test_rm_rf_with_read_only_file(self, tmp_path): | ||||||
|         """Ensure rm_rf can remove directories with read-only files in them (#5524)""" |         """Ensure rm_rf can remove directories with read-only files in them (#5524)""" | ||||||
|         from _pytest.pathlib import rm_rf |         from _pytest.pathlib import rm_rf | ||||||
| 
 | 
 | ||||||
|  | @ -337,14 +350,17 @@ class TestNumberedDir: | ||||||
| 
 | 
 | ||||||
|         fn.touch() |         fn.touch() | ||||||
| 
 | 
 | ||||||
|         mode = os.stat(str(fn)).st_mode |         self.chmod_r(fn) | ||||||
|         os.chmod(str(fn), mode & ~stat.S_IWRITE) |  | ||||||
| 
 | 
 | ||||||
|         rm_rf(fn.parent) |         rm_rf(fn.parent) | ||||||
| 
 | 
 | ||||||
|         assert not fn.parent.is_dir() |         assert not fn.parent.is_dir() | ||||||
| 
 | 
 | ||||||
|     def test_rmtree_with_read_only_directory(self, tmp_path): |     def chmod_r(self, path): | ||||||
|  |         mode = os.stat(str(path)).st_mode | ||||||
|  |         os.chmod(str(path), mode & ~stat.S_IWRITE) | ||||||
|  | 
 | ||||||
|  |     def test_rm_rf_with_read_only_directory(self, tmp_path): | ||||||
|         """Ensure rm_rf can remove read-only directories (#5524)""" |         """Ensure rm_rf can remove read-only directories (#5524)""" | ||||||
|         from _pytest.pathlib import rm_rf |         from _pytest.pathlib import rm_rf | ||||||
| 
 | 
 | ||||||
|  | @ -352,23 +368,37 @@ class TestNumberedDir: | ||||||
|         adir.mkdir() |         adir.mkdir() | ||||||
| 
 | 
 | ||||||
|         (adir / "foo.txt").touch() |         (adir / "foo.txt").touch() | ||||||
|         mode = os.stat(str(adir)).st_mode |         self.chmod_r(adir) | ||||||
|         os.chmod(str(adir), mode & ~stat.S_IWRITE) |  | ||||||
| 
 | 
 | ||||||
|         rm_rf(adir) |         rm_rf(adir) | ||||||
| 
 | 
 | ||||||
|         assert not adir.is_dir() |         assert not adir.is_dir() | ||||||
| 
 | 
 | ||||||
|     def test_cleanup_ignores_symlink(self, tmp_path): |     def test_on_rm_rf_error(self, tmp_path): | ||||||
|         the_symlink = tmp_path / (self.PREFIX + "current") |         from _pytest.pathlib import on_rm_rf_error | ||||||
|         attempt_symlink_to(the_symlink, tmp_path / (self.PREFIX + "5")) |  | ||||||
|         self._do_cleanup(tmp_path) |  | ||||||
| 
 | 
 | ||||||
|     def test_removal_accepts_lock(self, tmp_path): |         adir = tmp_path / "dir" | ||||||
|         folder = pathlib.make_numbered_dir(root=tmp_path, prefix=self.PREFIX) |         adir.mkdir() | ||||||
|         pathlib.create_cleanup_lock(folder) | 
 | ||||||
|         pathlib.maybe_delete_a_numbered_dir(folder) |         fn = adir / "foo.txt" | ||||||
|         assert folder.is_dir() |         fn.touch() | ||||||
|  |         self.chmod_r(fn) | ||||||
|  | 
 | ||||||
|  |         # unknown exception | ||||||
|  |         with pytest.warns(pytest.PytestWarning): | ||||||
|  |             exc_info = (None, RuntimeError(), None) | ||||||
|  |             on_rm_rf_error(os.unlink, str(fn), exc_info, start_path=tmp_path) | ||||||
|  |             assert fn.is_file() | ||||||
|  | 
 | ||||||
|  |         # unknown function | ||||||
|  |         with pytest.warns(pytest.PytestWarning): | ||||||
|  |             exc_info = (None, PermissionError(), None) | ||||||
|  |             on_rm_rf_error(None, str(fn), exc_info, start_path=tmp_path) | ||||||
|  |             assert fn.is_file() | ||||||
|  | 
 | ||||||
|  |         exc_info = (None, PermissionError(), None) | ||||||
|  |         on_rm_rf_error(os.unlink, str(fn), exc_info, start_path=tmp_path) | ||||||
|  |         assert not fn.is_file() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def attempt_symlink_to(path, to_path): | def attempt_symlink_to(path, to_path): | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue