[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot] 2022-10-19 14:19:24 +00:00 committed by Anthony Sottile
parent 19dda7c9bd
commit 49abbf2485
4 changed files with 389 additions and 328 deletions

View File

@ -2,12 +2,16 @@
create errno-specific classes for IO or os calls.
"""
import errno
import os
import sys
from types import ModuleType
import sys, os, errno
class Error(EnvironmentError):
def __repr__(self):
return "%s.%s %r: %s " %(self.__class__.__module__,
return "{}.{} {!r}: {} ".format(
self.__class__.__module__,
self.__class__.__name__,
self.__class__.__doc__,
" ".join(map(str, self.args)),
@ -15,11 +19,13 @@ class Error(EnvironmentError):
)
def __str__(self):
s = "[%s]: %s" %(self.__class__.__doc__,
s = "[{}]: {}".format(
self.__class__.__doc__,
" ".join(map(str, self.args)),
)
return s
_winerrnomap = {
2: errno.ENOENT,
3: errno.ENOENT,
@ -32,11 +38,13 @@ _winerrnomap = {
5: errno.EACCES, # anything better?
}
class ErrorMaker(ModuleType):
"""lazily provides Exception classes for each possible POSIX errno
(as defined per the 'errno' module). All such instances
subclass EnvironmentError.
"""
Error = Error
_errno2class = {}
@ -53,9 +61,11 @@ class ErrorMaker(ModuleType):
return self._errno2class[eno]
except KeyError:
clsname = errno.errorcode.get(eno, "UnknownErrno%d" % (eno,))
errorcls = type(Error)(clsname, (Error,),
{'__module__':'py.error',
'__doc__': os.strerror(eno)})
errorcls = type(Error)(
clsname,
(Error,),
{"__module__": "py.error", "__doc__": os.strerror(eno)},
)
self._errno2class[eno] = errorcls
return errorcls
@ -66,9 +76,9 @@ class ErrorMaker(ModuleType):
return func(*args, **kwargs)
except self.Error:
raise
except (OSError, EnvironmentError):
except OSError:
cls, value, tb = sys.exc_info()
if not hasattr(value, 'errno'):
if not hasattr(value, "errno"):
raise
__tracebackhide__ = False
errno = value.errno
@ -83,9 +93,9 @@ class ErrorMaker(ModuleType):
cls = self._geterrnoclass(_winerrnomap[errno])
except KeyError:
raise value
raise cls("%s%r" % (func.__name__, args))
raise cls(f"{func.__name__}{args!r}")
__tracebackhide__ = True
error = ErrorMaker('_pytest._py.error')
error = ErrorMaker("_pytest._py.error")
sys.modules[error.__name__] = error

View File

@ -1,24 +1,31 @@
"""
local path implementation.
"""
from __future__ import with_statement
from contextlib import contextmanager
import sys, os, atexit, io, uuid
from stat import S_ISLNK, S_ISDIR, S_ISREG
from os.path import abspath, normpath, isabs, exists, isdir, isfile, islink, dirname
import warnings
import os
import sys
import posixpath
import atexit
import fnmatch
import io
import os
import posixpath
import sys
import uuid
import warnings
from contextlib import contextmanager
from os.path import abspath
from os.path import dirname
from os.path import exists
from os.path import isabs
from os.path import isdir
from os.path import isfile
from os.path import islink
from os.path import normpath
from stat import S_ISDIR
from stat import S_ISLNK
from stat import S_ISREG
from . import error
# Moved from local.py.
iswin32 = sys.platform == "win32" or (getattr(os, '_name', False) == 'nt')
iswin32 = sys.platform == "win32" or (getattr(os, "_name", False) == "nt")
try:
# FileNotFoundError might happen in py34, and is not available with py27.
@ -29,6 +36,7 @@ except NameError:
try:
from os import fspath
except ImportError:
def fspath(path):
"""
Return the string representation of the path.
@ -47,7 +55,7 @@ except ImportError:
try:
return path_type.__fspath__(path)
except AttributeError:
if hasattr(path_type, '__fspath__'):
if hasattr(path_type, "__fspath__"):
raise
try:
import pathlib
@ -57,11 +65,13 @@ except ImportError:
if isinstance(path, pathlib.PurePath):
return str(path)
raise TypeError("expected str, bytes or os.PathLike object, not "
+ path_type.__name__)
raise TypeError(
"expected str, bytes or os.PathLike object, not " + path_type.__name__
)
class Checkers:
_depend_on_existence = 'exists', 'link', 'dir', 'file'
_depend_on_existence = "exists", "link", "dir", "file"
def __init__(self, path):
self.path = path
@ -73,11 +83,11 @@ class Checkers:
raise NotImplementedError
def dotfile(self):
return self.path.basename.startswith('.')
return self.path.basename.startswith(".")
def ext(self, arg):
if not arg.startswith('.'):
arg = '.' + arg
if not arg.startswith("."):
arg = "." + arg
return self.path.ext == arg
def exists(self):
@ -105,15 +115,14 @@ class Checkers:
try:
meth = getattr(self, name)
except AttributeError:
if name[:3] == 'not':
if name[:3] == "not":
invert = True
try:
meth = getattr(self, name[3:])
except AttributeError:
pass
if meth is None:
raise TypeError(
"no %r checker available for %r" % (name, self.path))
raise TypeError(f"no {name!r} checker available for {self.path!r}")
try:
if py.code.getrawcode(meth).co_argcount > 1:
if (not meth(value)) ^ invert:
@ -129,50 +138,58 @@ class Checkers:
if name in kw:
if kw.get(name):
return False
name = 'not' + name
name = "not" + name
if name in kw:
if not kw.get(name):
return False
return True
class NeverRaised(Exception):
pass
class PathBase(object):
class PathBase:
"""shared implementation for filesystem path objects."""
Checkers = Checkers
def __div__(self, other):
return self.join(fspath(other))
__truediv__ = __div__ # py3k
def basename(self):
"""basename part of path."""
return self._getbyspec('basename')[0]
return self._getbyspec("basename")[0]
basename = property(basename, None, None, basename.__doc__)
def dirname(self):
"""dirname part of path."""
return self._getbyspec('dirname')[0]
return self._getbyspec("dirname")[0]
dirname = property(dirname, None, None, dirname.__doc__)
def purebasename(self):
"""pure base name of the path."""
return self._getbyspec('purebasename')[0]
return self._getbyspec("purebasename")[0]
purebasename = property(purebasename, None, None, purebasename.__doc__)
def ext(self):
"""extension of the path (including the '.')."""
return self._getbyspec('ext')[0]
return self._getbyspec("ext")[0]
ext = property(ext, None, None, ext.__doc__)
def dirpath(self, *args, **kwargs):
"""return the directory path joined with any given path arguments."""
return self.new(basename='').join(*args, **kwargs)
return self.new(basename="").join(*args, **kwargs)
def read_binary(self):
"""read and return a bytestring from reading the path."""
with self.open('rb') as f:
with self.open("rb") as f:
return f.read()
def read_text(self, encoding):
@ -180,8 +197,7 @@ class PathBase(object):
with self.open("r", encoding=encoding) as f:
return f.read()
def read(self, mode='r'):
def read(self, mode="r"):
"""read and return a bytestring from reading the path."""
with self.open(mode) as f:
return f.read()
@ -189,14 +205,11 @@ class PathBase(object):
def readlines(self, cr=1):
"""read and return a list of lines from the path. if cr is False, the
newline will be removed from the end of each line."""
if sys.version_info < (3, ):
mode = 'rU'
else: # python 3 deprecates mode "U" in favor of "newline" option
mode = 'r'
mode = "r"
if not cr:
content = self.read(mode)
return content.split('\n')
return content.split("\n")
else:
f = self.open(mode)
try:
@ -206,9 +219,10 @@ newline will be removed from the end of each line. """
def load(self):
"""(deprecated) return object unpickled from self.read()"""
f = self.open('rb')
f = self.open("rb")
try:
import pickle
return error.checked_call(pickle.load, f)
finally:
f.close()
@ -216,9 +230,7 @@ newline will be removed from the end of each line. """
def move(self, target):
"""move this path to target."""
if target.relto(self):
raise error.EINVAL(
target,
"cannot move path into a subdirectory of itself")
raise error.EINVAL(target, "cannot move path into a subdirectory of itself")
try:
self.rename(target)
except error.EXDEV: # invalid cross-device link
@ -247,7 +259,7 @@ newline will be removed from the end of each line. """
path.check(file=1, link=1) # a link pointing to a file
"""
if not kw:
kw = {'exists': 1}
kw = {"exists": 1}
return self.Checkers(self)._evaluate(kw)
def fnmatch(self, pattern):
@ -274,16 +286,15 @@ newline will be removed from the end of each line. """
to the given 'relpath'.
"""
if not isinstance(relpath, (str, PathBase)):
raise TypeError("%r: not a string or path object" %(relpath,))
raise TypeError(f"{relpath!r}: not a string or path object")
strrelpath = str(relpath)
if strrelpath and strrelpath[-1] != self.sep:
strrelpath += self.sep
# assert strrelpath[-1] == self.sep
# assert strrelpath[-2] != self.sep
strself = self.strpath
if sys.platform == "win32" or getattr(os, '_name', None) == 'nt':
if os.path.normcase(strself).startswith(
os.path.normcase(strrelpath)):
if sys.platform == "win32" or getattr(os, "_name", None) == "nt":
if os.path.normcase(strself).startswith(os.path.normcase(strrelpath)):
return strself[len(strrelpath) :]
elif strself.startswith(strrelpath):
return strself[len(strrelpath) :]
@ -390,15 +401,17 @@ newline will be removed from the end of each line. """
sort if True will sort entries within each directory level.
"""
for x in Visitor(fil, rec, ignore, bf, sort).gen(self):
yield x
yield from Visitor(fil, rec, ignore, bf, sort).gen(self)
def _sortlist(self, res, sort):
if sort:
if hasattr(sort, '__call__'):
warnings.warn(DeprecationWarning(
if hasattr(sort, "__call__"):
warnings.warn(
DeprecationWarning(
"listdir(sort=callable) is deprecated and breaks on python3"
), stacklevel=3)
),
stacklevel=3,
)
res.sort(sort)
else:
res.sort()
@ -410,13 +423,14 @@ newline will be removed from the end of each line. """
def __fspath__(self):
return self.strpath
class Visitor:
def __init__(self, fil, rec, ignore, bf, sort):
if isinstance(fil, py.builtin._basestring):
fil = FNMatcher(fil)
if isinstance(rec, py.builtin._basestring):
self.rec = FNMatcher(rec)
elif not hasattr(rec, '__call__') and rec:
elif not hasattr(rec, "__call__") and rec:
self.rec = lambda path: True
else:
self.rec = rec
@ -431,8 +445,9 @@ class Visitor:
except self.ignore:
return
rec = self.rec
dirs = self.optsort([p for p in entries
if p.check(dir=1) and (rec is None or rec(p))])
dirs = self.optsort(
[p for p in entries if p.check(dir=1) and (rec is None or rec(p))]
)
if not self.breadthfirst:
for subdir in dirs:
for p in self.gen(subdir):
@ -445,6 +460,7 @@ class Visitor:
for p in self.gen(subdir):
yield p
class FNMatcher:
def __init__(self, pattern):
self.pattern = pattern
@ -452,9 +468,11 @@ class FNMatcher:
def __call__(self, path):
pattern = self.pattern
if (pattern.find(path.sep) == -1 and
iswin32 and
pattern.find(posixpath.sep) != -1):
if (
pattern.find(path.sep) == -1
and iswin32
and pattern.find(posixpath.sep) != -1
):
# Running on Windows, the pattern has no Windows path separators,
# and the pattern has one or more Posix path separators. Replace
# the Posix path separators with the Windows path separator.
@ -465,22 +483,20 @@ class FNMatcher:
else:
name = str(path) # path.strpath # XXX svn?
if not os.path.isabs(pattern):
pattern = '*' + path.sep + pattern
pattern = "*" + path.sep + pattern
return fnmatch.fnmatch(name, pattern)
if sys.version_info > (3,0):
def map_as_list(func, iter):
return list(map(func, iter))
else:
map_as_list = map
ALLOW_IMPORTLIB_MODE = sys.version_info > (3, 5)
if ALLOW_IMPORTLIB_MODE:
import importlib
class Stat(object):
class Stat:
def __getattr__(self, name):
return getattr(self._osstatresult, "st_" + name)
@ -493,6 +509,7 @@ class Stat(object):
if iswin32:
raise NotImplementedError("XXX win32")
import pwd
entry = error.checked_call(pwd.getpwuid, self.uid)
return entry[0]
@ -502,6 +519,7 @@ class Stat(object):
if iswin32:
raise NotImplementedError("XXX win32")
import grp
entry = error.checked_call(grp.getgrgid, self.gid)
return entry[0]
@ -512,9 +530,10 @@ class Stat(object):
return S_ISREG(self._osstatresult.st_mode)
def islink(self):
st = self.path.lstat()
self.path.lstat()
return S_ISLNK(self._osstatresult.st_mode)
class PosixPath(PathBase):
def chown(self, user, group, rec=0):
"""change ownership to the given user and group.
@ -548,31 +567,39 @@ class PosixPath(PathBase):
relsource = self.__class__(value).relto(base)
reldest = self.relto(base)
n = reldest.count(self.sep)
target = self.sep.join(('..', )*n + (relsource, ))
target = self.sep.join(("..",) * n + (relsource,))
error.checked_call(os.symlink, target, self.strpath)
def getuserid(user):
import pwd
if not isinstance(user, int):
user = pwd.getpwnam(user)[2]
return user
def getgroupid(group):
import grp
if not isinstance(group, int):
group = grp.getgrnam(group)[2]
return group
FSBase = not iswin32 and PosixPath or PathBase
class LocalPath(FSBase):
"""object oriented interface to os.path and other local filesystem
related information.
"""
class ImportMismatchError(ImportError):
"""raised on pyimport() if there is a mismatch of __file__'s"""
sep = os.sep
class Checkers(Checkers):
def _stat(self):
try:
@ -613,8 +640,10 @@ class LocalPath(FSBase):
try:
path = fspath(path)
except TypeError:
raise ValueError("can only pass None, Path instances "
"or non-empty strings to LocalPath")
raise ValueError(
"can only pass None, Path instances "
"or non-empty strings to LocalPath"
)
if expanduser:
path = os.path.expanduser(path)
self.strpath = abspath(path)
@ -649,8 +678,7 @@ class LocalPath(FSBase):
return fspath(self) > fspath(other)
def samefile(self, other):
""" return True if 'other' references the same file as 'self'.
"""
"""return True if 'other' references the same file as 'self'."""
other = fspath(other)
if not isabs(other):
other = abspath(other)
@ -658,8 +686,7 @@ class LocalPath(FSBase):
return True
if not hasattr(os.path, "samefile"):
return False
return error.checked_call(
os.path.samefile, self.strpath, other)
return error.checked_call(os.path.samefile, self.strpath, other)
def remove(self, rec=1, ignore_errors=False):
"""remove a file or directory (or a directory tree if rec=1).
@ -672,9 +699,10 @@ class LocalPath(FSBase):
if iswin32:
self.chmod(0o700, rec=1)
import shutil
error.checked_call(
shutil.rmtree, self.strpath,
ignore_errors=ignore_errors)
shutil.rmtree, self.strpath, ignore_errors=ignore_errors
)
else:
error.checked_call(os.rmdir, self.strpath)
else:
@ -693,8 +721,8 @@ class LocalPath(FSBase):
mod = __import__(hashtype)
hash = getattr(mod, hashtype)()
except (AttributeError, ImportError):
raise ValueError("Don't know how to compute %r hash" %(hashtype,))
f = self.open('rb')
raise ValueError(f"Don't know how to compute {hashtype!r} hash")
f = self.open("rb")
try:
while 1:
buf = f.read(chunksize)
@ -720,28 +748,28 @@ class LocalPath(FSBase):
obj.strpath = self.strpath
return obj
drive, dirname, basename, purebasename, ext = self._getbyspec(
"drive,dirname,basename,purebasename,ext")
if 'basename' in kw:
if 'purebasename' in kw or 'ext' in kw:
"drive,dirname,basename,purebasename,ext"
)
if "basename" in kw:
if "purebasename" in kw or "ext" in kw:
raise ValueError("invalid specification %r" % kw)
else:
pb = kw.setdefault('purebasename', purebasename)
pb = kw.setdefault("purebasename", purebasename)
try:
ext = kw['ext']
ext = kw["ext"]
except KeyError:
pass
else:
if ext and not ext.startswith('.'):
ext = '.' + ext
kw['basename'] = pb + ext
if ext and not ext.startswith("."):
ext = "." + ext
kw["basename"] = pb + ext
if ('dirname' in kw and not kw['dirname']):
kw['dirname'] = drive
if "dirname" in kw and not kw["dirname"]:
kw["dirname"] = drive
else:
kw.setdefault('dirname', dirname)
kw.setdefault('sep', self.sep)
obj.strpath = normpath(
"%(dirname)s%(sep)s%(basename)s" % kw)
kw.setdefault("dirname", dirname)
kw.setdefault("sep", self.sep)
obj.strpath = normpath("%(dirname)s%(sep)s%(basename)s" % kw)
return obj
def _getbyspec(self, spec):
@ -749,26 +777,26 @@ class LocalPath(FSBase):
res = []
parts = self.strpath.split(self.sep)
args = filter(None, spec.split(',') )
args = filter(None, spec.split(","))
append = res.append
for name in args:
if name == 'drive':
if name == "drive":
append(parts[0])
elif name == 'dirname':
elif name == "dirname":
append(self.sep.join(parts[:-1]))
else:
basename = parts[-1]
if name == 'basename':
if name == "basename":
append(basename)
else:
i = basename.rfind('.')
i = basename.rfind(".")
if i == -1:
purebasename, ext = basename, ''
purebasename, ext = basename, ""
else:
purebasename, ext = basename[:i], basename[i:]
if name == 'purebasename':
if name == "purebasename":
append(purebasename)
elif name == 'ext':
elif name == "ext":
append(ext)
else:
raise ValueError("invalid part specification %r" % name)
@ -782,7 +810,7 @@ class LocalPath(FSBase):
if args:
path = path.join(*args)
return path
return super(LocalPath, self).dirpath(*args, **kwargs)
return super().dirpath(*args, **kwargs)
def join(self, *args, **kwargs):
"""return a new path by appending all 'args' as path
@ -792,7 +820,7 @@ class LocalPath(FSBase):
sep = self.sep
strargs = [fspath(arg) for arg in args]
strpath = self.strpath
if kwargs.get('abs'):
if kwargs.get("abs"):
newargs = []
for arg in reversed(strargs):
if isabs(arg):
@ -806,15 +834,15 @@ class LocalPath(FSBase):
arg = arg.strip(sep)
if iswin32:
# allow unix style paths even on windows.
arg = arg.strip('/')
arg = arg.replace('/', sep)
arg = arg.strip("/")
arg = arg.replace("/", sep)
strpath = strpath + actual_sep + arg
actual_sep = sep
obj = object.__new__(self.__class__)
obj.strpath = normpath(strpath)
return obj
def open(self, mode='r', ensure=False, encoding=None):
def open(self, mode="r", ensure=False, encoding=None):
"""return an opened file with the given mode.
If ensure is True, create parent directories if needed.
@ -841,9 +869,10 @@ class LocalPath(FSBase):
return not kw["dir"] ^ isdir(self.strpath)
if "file" in kw:
return not kw["file"] ^ isfile(self.strpath)
return super(LocalPath, self).check(**kw)
return super().check(**kw)
_patternchars = set("*?[" + os.path.sep)
def listdir(self, fil=None, sort=None):
"""list directory contents, possibly filter by the given fil func
and possibly sorted.
@ -892,8 +921,10 @@ class LocalPath(FSBase):
if stat:
copystat(self, target)
else:
def rec(p):
return p.check(link=0)
for x in self.visit(rec=rec):
relpath = x.relto(self)
newx = target.join(relpath)
@ -917,8 +948,9 @@ class LocalPath(FSBase):
def dump(self, obj, bin=1):
"""pickle object into path location"""
f = self.open('wb')
f = self.open("wb")
import pickle
try:
error.checked_call(pickle.dump, obj, f, bin)
finally:
@ -936,7 +968,7 @@ class LocalPath(FSBase):
"""
if ensure:
self.dirpath().ensure(dir=1)
with self.open('wb') as f:
with self.open("wb") as f:
f.write(data)
def write_text(self, data, encoding, ensure=False):
@ -945,16 +977,16 @@ class LocalPath(FSBase):
"""
if ensure:
self.dirpath().ensure(dir=1)
with self.open('w', encoding=encoding) as f:
with self.open("w", encoding=encoding) as f:
f.write(data)
def write(self, data, mode='w', ensure=False):
def write(self, data, mode="w", ensure=False):
"""write data into path. If ensure is True create
missing parent directories.
"""
if ensure:
self.dirpath().ensure(dir=1)
if 'b' in mode:
if "b" in mode:
if not py.builtin._isbytes(data):
raise ValueError("can only process bytes")
else:
@ -991,12 +1023,12 @@ class LocalPath(FSBase):
then the path is forced to be a directory path.
"""
p = self.join(*args)
if kwargs.get('dir', 0):
if kwargs.get("dir", 0):
return p._ensuredirs()
else:
p.dirpath()._ensuredirs()
if not p.check(file=1):
p.open('w').close()
p.open("w").close()
return p
def stat(self, raising=True):
@ -1036,7 +1068,6 @@ class LocalPath(FSBase):
error.checked_call(os.chdir, self.strpath)
return old
@contextmanager
def as_cwd(self):
"""
@ -1060,7 +1091,7 @@ class LocalPath(FSBase):
return self.stat().atime
def __repr__(self):
return 'local(%r)' % self.strpath
return "local(%r)" % self.strpath
def __str__(self):
"""return string representation of the Path."""
@ -1072,7 +1103,7 @@ class LocalPath(FSBase):
if rec is True perform recursively.
"""
if not isinstance(mode, int):
raise TypeError("mode %r must be an integer" % (mode,))
raise TypeError(f"mode {mode!r} must be an integer")
if rec:
for x in self.visit(rec=rec):
error.checked_call(os.chmod, str(x), mode)
@ -1086,7 +1117,7 @@ class LocalPath(FSBase):
pkgpath = None
for parent in self.parts(reverse=True):
if parent.isdir():
if not parent.join('__init__.py').exists():
if not parent.join("__init__.py").exists():
break
if not isimportable(parent.basename):
break
@ -1127,18 +1158,15 @@ class LocalPath(FSBase):
if not self.check():
raise error.ENOENT(self)
if ensuresyspath == 'importlib':
if ensuresyspath == "importlib":
if modname is None:
modname = self.purebasename
if not ALLOW_IMPORTLIB_MODE:
raise ImportError(
"Can't use importlib due to old version of Python")
spec = importlib.util.spec_from_file_location(
modname, str(self))
raise ImportError("Can't use importlib due to old version of Python")
spec = importlib.util.spec_from_file_location(modname, str(self))
if spec is None:
raise ImportError(
"Can't find module %s at location %s" %
(modname, str(self))
f"Can't find module {modname} at location {str(self)}"
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
@ -1164,10 +1192,10 @@ class LocalPath(FSBase):
return mod # we don't check anything as we might
# be in a namespace package ... too icky to check
modfile = mod.__file__
if modfile[-4:] in ('.pyc', '.pyo'):
if modfile[-4:] in (".pyc", ".pyo"):
modfile = modfile[:-1]
elif modfile.endswith('$py.class'):
modfile = modfile[:-9] + '.py'
elif modfile.endswith("$py.class"):
modfile = modfile[:-9] + ".py"
if modfile.endswith(os.path.sep + "__init__.py"):
if self.basename != "__init__.py":
modfile = modfile[:-12]
@ -1176,8 +1204,8 @@ class LocalPath(FSBase):
except error.ENOENT:
issame = False
if not issame:
ignore = os.getenv('PY_IGNORE_IMPORTMISMATCH')
if ignore != '1':
ignore = os.getenv("PY_IGNORE_IMPORTMISMATCH")
if ignore != "1":
raise self.ImportMismatchError(modname, modfile, self)
return mod
else:
@ -1186,6 +1214,7 @@ class LocalPath(FSBase):
except KeyError:
# we have a custom modname, do a pseudo-import
import types
mod = types.ModuleType(modname)
mod.__file__ = str(self)
sys.modules[modname] = mod
@ -1202,8 +1231,9 @@ class LocalPath(FSBase):
The process is directly invoked and not through a system shell.
"""
from subprocess import Popen, PIPE
argv = map_as_list(str, argv)
popen_opts['stdout'] = popen_opts['stderr'] = PIPE
popen_opts["stdout"] = popen_opts["stderr"] = PIPE
proc = Popen([str(self)] + argv, **popen_opts)
stdout, stderr = proc.communicate()
ret = proc.wait()
@ -1212,8 +1242,13 @@ class LocalPath(FSBase):
if ret != 0:
if py.builtin._isbytes(stderr):
stderr = py.builtin._totext(stderr, sys.getdefaultencoding())
raise py.process.cmdexec.Error(ret, ret, str(self),
stdout, stderr,)
raise py.process.cmdexec.Error(
ret,
ret,
str(self),
stdout,
stderr,
)
return stdout
def sysfind(cls, name, checker=None, paths=None):
@ -1231,21 +1266,22 @@ class LocalPath(FSBase):
else:
if paths is None:
if iswin32:
paths = os.environ['Path'].split(';')
if '' not in paths and '.' not in paths:
paths.append('.')
paths = os.environ["Path"].split(";")
if "" not in paths and "." not in paths:
paths.append(".")
try:
systemroot = os.environ['SYSTEMROOT']
systemroot = os.environ["SYSTEMROOT"]
except KeyError:
pass
else:
paths = [path.replace('%SystemRoot%', systemroot)
for path in paths]
paths = [
path.replace("%SystemRoot%", systemroot) for path in paths
]
else:
paths = os.environ['PATH'].split(':')
paths = os.environ["PATH"].split(":")
tryadd = []
if iswin32:
tryadd += os.environ['PATHEXT'].split(os.pathsep)
tryadd += os.environ["PATHEXT"].split(os.pathsep)
tryadd.append("")
for x in paths:
@ -1260,17 +1296,19 @@ class LocalPath(FSBase):
except error.EACCES:
pass
return None
sysfind = classmethod(sysfind)
def _gethomedir(cls):
try:
x = os.environ['HOME']
x = os.environ["HOME"]
except KeyError:
try:
x = os.environ["HOMEDRIVE"] + os.environ['HOMEPATH']
x = os.environ["HOMEDRIVE"] + os.environ["HOMEPATH"]
except KeyError:
return None
return cls(x)
_gethomedir = classmethod(_gethomedir)
# """
@ -1282,6 +1320,7 @@ class LocalPath(FSBase):
(where tempfiles are usually created in)
"""
import tempfile
return py.path.local(tempfile.gettempdir())
@classmethod
@ -1290,12 +1329,14 @@ class LocalPath(FSBase):
(which we created ourself).
"""
import tempfile
if rootdir is None:
rootdir = cls.get_temproot()
return cls(error.checked_call(tempfile.mkdtemp, dir=str(rootdir)))
def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3,
lock_timeout=172800): # two days
def make_numbered_dir(
cls, prefix="session-", rootdir=None, keep=3, lock_timeout=172800
): # two days
"""return unique directory with a number greater than the current
maximum one. The number is assumed to start directly after prefix.
if keep is true directories with a number less than (maxnum-keep)
@ -1306,6 +1347,7 @@ class LocalPath(FSBase):
rootdir = cls.get_temproot()
nprefix = prefix.lower()
def parse_num(path):
"""parse the number out of a path (if it matches the prefix)"""
nbasename = path.basename.lower()
@ -1318,18 +1360,21 @@ class LocalPath(FSBase):
def create_lockfile(path):
"""exclusively create lockfile. Throws when failed"""
mypid = os.getpid()
lockfile = path.join('.lock')
if hasattr(lockfile, 'mksymlinkto'):
lockfile = path.join(".lock")
if hasattr(lockfile, "mksymlinkto"):
lockfile.mksymlinkto(str(mypid))
else:
fd = error.checked_call(os.open, str(lockfile), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
with os.fdopen(fd, 'w') as f:
fd = error.checked_call(
os.open, str(lockfile), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644
)
with os.fdopen(fd, "w") as f:
f.write(str(mypid))
return lockfile
def atexit_remove_lockfile(lockfile):
"""ensure lockfile is removed at process exit"""
mypid = os.getpid()
def try_remove_lockfile():
# in a fork() situation, only the last process should
# remove the .lock, otherwise the other processes run the
@ -1342,6 +1387,7 @@ class LocalPath(FSBase):
lockfile.remove()
except error.Error:
pass
atexit.register(try_remove_lockfile)
# compute the maximum number currently in use with the prefix
@ -1381,7 +1427,7 @@ class LocalPath(FSBase):
except error.Error:
pass
garbage_prefix = prefix + 'garbage-'
garbage_prefix = prefix + "garbage-"
def is_garbage(path):
"""check if path denotes directory scheduled for removal"""
@ -1429,15 +1475,15 @@ class LocalPath(FSBase):
# make link...
try:
username = os.environ['USER'] #linux, et al
username = os.environ["USER"] # linux, et al
except KeyError:
try:
username = os.environ['USERNAME'] #windows
username = os.environ["USERNAME"] # windows
except KeyError:
username = 'current'
username = "current"
src = str(udir)
dest = src[:src.rfind('-')] + '-' + username
dest = src[: src.rfind("-")] + "-" + username
try:
os.unlink(dest)
except OSError:
@ -1448,12 +1494,14 @@ class LocalPath(FSBase):
pass
return udir
make_numbered_dir = classmethod(make_numbered_dir)
def copymode(src, dest):
"""copy permission from src to dst."""
import shutil
shutil.copymode(src, dest)
@ -1461,14 +1509,15 @@ def copystat(src, dest):
"""copy permission, last modification time,
last access time, and flags from src to dst."""
import shutil
shutil.copystat(str(src), str(dest))
def copychunked(src, dest):
chunksize = 524288 # half a meg of bytes
fsrc = src.open('rb')
fsrc = src.open("rb")
try:
fdest = dest.open('wb')
fdest = dest.open("wb")
try:
while 1:
buf = fsrc.read(chunksize)
@ -1482,8 +1531,9 @@ def copychunked(src, dest):
def isimportable(name):
if name and (name[0].isalpha() or name[0] == '_'):
name = name.replace("_", '')
if name and (name[0].isalpha() or name[0] == "_"):
name = name.replace("_", "")
return not name or name.isalnum()
local = LocalPath

View File

@ -18,6 +18,7 @@ from typing import TypeVar
from typing import Union
import attr
import py
# fmt: off

View File

@ -6,5 +6,5 @@ import sys
import _pytest._py.error as error
import _pytest._py.path as path
sys.modules['py.error'] = error
sys.modules['py.path'] = path
sys.modules["py.error"] = error
sys.modules["py.path"] = path