Add docstring and use tmp_path in tests

Related to #10114
This commit is contained in:
Bruno Oliveira 2022-07-08 18:39:03 -03:00
parent 2c1effec33
commit 57945bd8b1
2 changed files with 97 additions and 80 deletions

View File

@ -1,5 +1,10 @@
"""
Module copied over from https://github.com/untitaker/python-atomicwrites, which has become
unmaintained.
Since then, we have made changes to simplify the code, focusing on pytest's use-case.
"""
import contextlib import contextlib
import io
import os import os
import sys import sys
import tempfile import tempfile
@ -15,7 +20,7 @@ try:
except ImportError: except ImportError:
fspath = None fspath = None
__version__ = '1.4.1' __version__ = "1.4.1"
PY2 = sys.version_info[0] == 2 PY2 = sys.version_info[0] == 2
@ -35,8 +40,9 @@ DEFAULT_MODE = "wb" if PY2 else "w"
_proper_fsync = os.fsync _proper_fsync = os.fsync
if sys.platform != 'win32': if sys.platform != "win32":
if hasattr(fcntl, 'F_FULLFSYNC'): if hasattr(fcntl, "F_FULLFSYNC"):
def _proper_fsync(fd): def _proper_fsync(fd):
# https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html # https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html
# https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html # https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html
@ -64,6 +70,7 @@ if sys.platform != 'win32':
_sync_directory(dst_dir) _sync_directory(dst_dir)
if src_dir != dst_dir: if src_dir != dst_dir:
_sync_directory(src_dir) _sync_directory(src_dir)
else: else:
from ctypes import windll, WinError from ctypes import windll, WinError
@ -76,43 +83,47 @@ else:
raise WinError() raise WinError()
def _replace_atomic(src, dst): def _replace_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW( _handle_errors(
_path_to_unicode(src), _path_to_unicode(dst), windll.kernel32.MoveFileExW(
_windows_default_flags | _MOVEFILE_REPLACE_EXISTING _path_to_unicode(src),
)) _path_to_unicode(dst),
_windows_default_flags | _MOVEFILE_REPLACE_EXISTING,
)
)
def _move_atomic(src, dst): def _move_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW( _handle_errors(
_path_to_unicode(src), _path_to_unicode(dst), windll.kernel32.MoveFileExW(
_windows_default_flags _path_to_unicode(src), _path_to_unicode(dst), _windows_default_flags
)) )
)
def replace_atomic(src, dst): def replace_atomic(src, dst):
''' """
Move ``src`` to ``dst``. If ``dst`` exists, it will be silently Move ``src`` to ``dst``. If ``dst`` exists, it will be silently
overwritten. overwritten.
Both paths must reside on the same filesystem for the operation to be Both paths must reside on the same filesystem for the operation to be
atomic. atomic.
''' """
return _replace_atomic(src, dst) return _replace_atomic(src, dst)
def move_atomic(src, dst): def move_atomic(src, dst):
''' """
Move ``src`` to ``dst``. There might a timewindow where both filesystem Move ``src`` to ``dst``. There might a timewindow where both filesystem
entries exist. If ``dst`` already exists, :py:exc:`FileExistsError` will be entries exist. If ``dst`` already exists, :py:exc:`FileExistsError` will be
raised. raised.
Both paths must reside on the same filesystem for the operation to be Both paths must reside on the same filesystem for the operation to be
atomic. atomic.
''' """
return _move_atomic(src, dst) return _move_atomic(src, dst)
class AtomicWriter(object): class AtomicWriter:
''' """
A helper class for performing atomic writes. Usage:: A helper class for performing atomic writes. Usage::
with AtomicWriter(path).open() as f: with AtomicWriter(path).open() as f:
@ -130,21 +141,20 @@ class AtomicWriter(object):
If you need further control over the exact behavior, you are encouraged to If you need further control over the exact behavior, you are encouraged to
subclass. subclass.
''' """
def __init__(self, path, mode=DEFAULT_MODE, overwrite=False, def __init__(self, path, mode=DEFAULT_MODE, overwrite=False, **open_kwargs):
**open_kwargs): if "a" in mode:
if 'a' in mode:
raise ValueError( raise ValueError(
'Appending to an existing file is not supported, because that ' "Appending to an existing file is not supported, because that "
'would involve an expensive `copy`-operation to a temporary ' "would involve an expensive `copy`-operation to a temporary "
'file. Open the file in normal `w`-mode and copy explicitly ' "file. Open the file in normal `w`-mode and copy explicitly "
'if that\'s what you\'re after.' "if that's what you're after."
) )
if 'x' in mode: if "x" in mode:
raise ValueError('Use the `overwrite`-parameter instead.') raise ValueError("Use the `overwrite`-parameter instead.")
if 'w' not in mode: if "w" not in mode:
raise ValueError('AtomicWriters can only be written to.') raise ValueError("AtomicWriters can only be written to.")
# Attempt to convert `path` to `str` or `bytes` # Attempt to convert `path` to `str` or `bytes`
if fspath is not None: if fspath is not None:
@ -156,9 +166,9 @@ class AtomicWriter(object):
self._open_kwargs = open_kwargs self._open_kwargs = open_kwargs
def open(self): def open(self):
''' """
Open the temporary file. Open the temporary file.
''' """
return self._open(self.get_fileobject) return self._open(self.get_fileobject)
@contextlib.contextmanager @contextlib.contextmanager
@ -178,41 +188,41 @@ class AtomicWriter(object):
except Exception: except Exception:
pass pass
def get_fileobject(self, suffix="", prefix=tempfile.gettempprefix(), def get_fileobject(
dir=None, **kwargs): self, suffix="", prefix=tempfile.gettempprefix(), dir=None, **kwargs
'''Return the temporary file to use.''' ):
"""Return the temporary file to use."""
if dir is None: if dir is None:
dir = os.path.normpath(os.path.dirname(self._path)) dir = os.path.normpath(os.path.dirname(self._path))
descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
dir=dir)
# io.open() will take either the descriptor or the name, but we need # io.open() will take either the descriptor or the name, but we need
# the name later for commit()/replace_atomic() and couldn't find a way # the name later for commit()/replace_atomic() and couldn't find a way
# to get the filename from the descriptor. # to get the filename from the descriptor.
os.close(descriptor) os.close(descriptor)
kwargs['mode'] = self._mode kwargs["mode"] = self._mode
kwargs['file'] = name kwargs["file"] = name
return io.open(**kwargs) return open(**kwargs)
def sync(self, f): def sync(self, f):
'''responsible for clearing as many file caches as possible before """responsible for clearing as many file caches as possible before
commit''' commit"""
f.flush() f.flush()
_proper_fsync(f.fileno()) _proper_fsync(f.fileno())
def commit(self, f): def commit(self, f):
'''Move the temporary file to the target location.''' """Move the temporary file to the target location."""
if self._overwrite: if self._overwrite:
replace_atomic(f.name, self._path) replace_atomic(f.name, self._path)
else: else:
move_atomic(f.name, self._path) move_atomic(f.name, self._path)
def rollback(self, f): def rollback(self, f):
'''Clean up all temporary resources.''' """Clean up all temporary resources."""
os.unlink(f.name) os.unlink(f.name)
def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs): def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs):
''' """
Simple atomic writes. This wraps :py:class:`AtomicWriter`:: Simple atomic writes. This wraps :py:class:`AtomicWriter`::
with atomic_write(path) as f: with atomic_write(path) as f:
@ -225,5 +235,5 @@ def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs):
Additional keyword arguments are passed to the writer class. See Additional keyword arguments are passed to the writer class. See
:py:class:`AtomicWriter`. :py:class:`AtomicWriter`.
''' """
return writer_cls(path, **cls_kwargs).open() return writer_cls(path, **cls_kwargs).open()

View File

@ -1,63 +1,69 @@
"""
Module copied over from https://github.com/untitaker/python-atomicwrites, which has become
unmaintained.
Since then, we have made changes to simplify the code, focusing on pytest's use-case.
"""
import errno import errno
import os import os
from pathlib import Path
from atomicwrites import atomic_write
import pytest import pytest
from _pytest.atomic_writes import atomic_write
def test_atomic_write(tmpdir): def test_atomic_write(tmp_path: Path) -> None:
fname = tmpdir.join('ha') fname = tmp_path.joinpath("ha")
for i in range(2): for i in range(2):
with atomic_write(str(fname), overwrite=True) as f: with atomic_write(str(fname), overwrite=True) as f:
f.write('hoho') f.write("hoho")
with pytest.raises(OSError) as excinfo: with pytest.raises(OSError) as excinfo:
with atomic_write(str(fname), overwrite=False) as f: with atomic_write(str(fname), overwrite=False) as f:
f.write('haha') f.write("haha")
assert excinfo.value.errno == errno.EEXIST assert excinfo.value.errno == errno.EEXIST
assert fname.read() == 'hoho' assert fname.read_text() == "hoho"
assert len(tmpdir.listdir()) == 1 assert len(list(tmp_path.iterdir())) == 1
def test_teardown(tmpdir): def test_teardown(tmp_path: Path) -> None:
fname = tmpdir.join('ha') fname = tmp_path.joinpath("ha")
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
with atomic_write(str(fname), overwrite=True): with atomic_write(str(fname), overwrite=True):
assert False assert False
assert not tmpdir.listdir() assert not list(tmp_path.iterdir())
def test_replace_simultaneously_created_file(tmpdir): def test_replace_simultaneously_created_file(tmp_path: Path) -> None:
fname = tmpdir.join('ha') fname = tmp_path.joinpath("ha")
with atomic_write(str(fname), overwrite=True) as f: with atomic_write(str(fname), overwrite=True) as f:
f.write('hoho') f.write("hoho")
fname.write('harhar') fname.write_text("harhar")
assert fname.read() == 'harhar' assert fname.read_text() == "harhar"
assert fname.read() == 'hoho' assert fname.read_text() == "hoho"
assert len(tmpdir.listdir()) == 1 assert len(list(tmp_path.iterdir())) == 1
def test_dont_remove_simultaneously_created_file(tmpdir): def test_dont_remove_simultaneously_created_file(tmp_path: Path) -> None:
fname = tmpdir.join('ha') fname = tmp_path.joinpath("ha")
with pytest.raises(OSError) as excinfo: with pytest.raises(OSError) as excinfo:
with atomic_write(str(fname), overwrite=False) as f: with atomic_write(str(fname), overwrite=False) as f:
f.write('hoho') f.write("hoho")
fname.write('harhar') fname.write_text("harhar")
assert fname.read() == 'harhar' assert fname.read_text() == "harhar"
assert excinfo.value.errno == errno.EEXIST assert excinfo.value.errno == errno.EEXIST
assert fname.read() == 'harhar' assert fname.read_text() == "harhar"
assert len(tmpdir.listdir()) == 1 assert len(list(tmp_path.iterdir())) == 1
# Verify that nested exceptions during rollback do not overwrite the initial # Verify that nested exceptions during rollback do not overwrite the initial
# exception that triggered a rollback. # exception that triggered a rollback.
def test_open_reraise(tmpdir): def test_open_reraise(tmp_path: Path) -> None:
fname = tmpdir.join('ha') fname = tmp_path.joinpath("ha")
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
aw = atomic_write(str(fname), overwrite=False) aw = atomic_write(str(fname), overwrite=False)
with aw: with aw:
@ -70,22 +76,23 @@ def test_open_reraise(tmpdir):
assert False, "Intentional failure for testing purposes" assert False, "Intentional failure for testing purposes"
def test_atomic_write_in_pwd(tmpdir): def test_atomic_write_in_pwd(tmp_path: Path) -> None:
orig_curdir = os.getcwd() orig_curdir = os.getcwd()
try: try:
os.chdir(str(tmpdir)) os.chdir(str(tmp_path))
fname = 'ha' fname = "ha"
for i in range(2): for i in range(2):
with atomic_write(str(fname), overwrite=True) as f: with atomic_write(str(fname), overwrite=True) as f:
f.write('hoho') f.write("hoho")
with pytest.raises(OSError) as excinfo: with pytest.raises(OSError) as excinfo:
with atomic_write(str(fname), overwrite=False) as f: with atomic_write(str(fname), overwrite=False) as f:
f.write('haha') f.write("haha")
assert excinfo.value.errno == errno.EEXIST assert excinfo.value.errno == errno.EEXIST
assert open(fname).read() == 'hoho' with open(fname) as f:
assert len(tmpdir.listdir()) == 1 assert f.read() == "hoho"
assert len(list(tmp_path.iterdir())) == 1
finally: finally:
os.chdir(orig_curdir) os.chdir(orig_curdir)