py36+: com2ann

This commit is contained in:
Anthony Sottile
2020-10-05 18:13:05 -07:00
parent 703e89134c
commit 33d119f71a
62 changed files with 431 additions and 443 deletions

View File

@@ -103,7 +103,7 @@ if os.environ.get("_ARGCOMPLETE"):
import argcomplete.completers
except ImportError:
sys.exit(-1)
filescompleter = FastFilesCompleter() # type: Optional[FastFilesCompleter]
filescompleter: Optional[FastFilesCompleter] = FastFilesCompleter()
def try_argcomplete(parser: argparse.ArgumentParser) -> None:
argcomplete.autocomplete(parser, always_complete_options=False)

View File

@@ -164,7 +164,7 @@ class Frame:
class TracebackEntry:
"""A single entry in a Traceback."""
_repr_style = None # type: Optional[Literal["short", "long"]]
_repr_style: Optional['Literal["short", "long"]'] = None
exprinfo = None
def __init__(
@@ -246,9 +246,9 @@ class TracebackEntry:
Mostly for internal use.
"""
tbh = (
tbh: Union[bool, Callable[[Optional[ExceptionInfo[BaseException]]], bool]] = (
False
) # type: Union[bool, Callable[[Optional[ExceptionInfo[BaseException]]], bool]]
)
for maybe_ns_dct in (self.frame.f_locals, self.frame.f_globals):
# in normal cases, f_locals and f_globals are dictionaries
# however via `exec(...)` / `eval(...)` they can be other types
@@ -301,7 +301,7 @@ class Traceback(List[TracebackEntry]):
if isinstance(tb, TracebackType):
def f(cur: TracebackType) -> Iterable[TracebackEntry]:
cur_ = cur # type: Optional[TracebackType]
cur_: Optional[TracebackType] = cur
while cur_ is not None:
yield TracebackEntry(cur_, excinfo=excinfo)
cur_ = cur_.tb_next
@@ -381,7 +381,7 @@ class Traceback(List[TracebackEntry]):
def recursionindex(self) -> Optional[int]:
"""Return the index of the frame/TracebackEntry where recursion originates if
appropriate, None if no recursion occurred."""
cache = {} # type: Dict[Tuple[Any, int, int], List[Dict[str, Any]]]
cache: Dict[Tuple[Any, int, int], List[Dict[str, Any]]] = {}
for i, entry in enumerate(self):
# id for the code.raw is needed to work around
# the strange metaprogramming in the decorator lib from pypi
@@ -760,7 +760,7 @@ class FormattedExcinfo:
entry: TracebackEntry,
excinfo: Optional[ExceptionInfo[BaseException]] = None,
) -> "ReprEntry":
lines = [] # type: List[str]
lines: List[str] = []
style = entry._repr_style if entry._repr_style is not None else self.style
if style in ("short", "long"):
source = self._getentrysource(entry)
@@ -842,7 +842,7 @@ class FormattedExcinfo:
recursionindex = traceback.recursionindex()
except Exception as e:
max_frames = 10
extraline = (
extraline: Optional[str] = (
"!!! Recursion error detected, but an error occurred locating the origin of recursion.\n"
" The following exception happened when comparing locals in the stack frame:\n"
" {exc_type}: {exc_msg}\n"
@@ -852,7 +852,7 @@ class FormattedExcinfo:
exc_msg=str(e),
max_frames=max_frames,
total=len(traceback),
) # type: Optional[str]
)
# Type ignored because adding two instaces of a List subtype
# currently incorrectly has type List instead of the subtype.
traceback = traceback[:max_frames] + traceback[-max_frames:] # type: ignore
@@ -868,20 +868,20 @@ class FormattedExcinfo:
def repr_excinfo(
self, excinfo: ExceptionInfo[BaseException]
) -> "ExceptionChainRepr":
repr_chain = (
[]
) # type: List[Tuple[ReprTraceback, Optional[ReprFileLocation], Optional[str]]]
e = excinfo.value # type: Optional[BaseException]
excinfo_ = excinfo # type: Optional[ExceptionInfo[BaseException]]
repr_chain: List[
Tuple[ReprTraceback, Optional[ReprFileLocation], Optional[str]]
] = []
e: Optional[BaseException] = excinfo.value
excinfo_: Optional[ExceptionInfo[BaseException]] = excinfo
descr = None
seen = set() # type: Set[int]
seen: Set[int] = set()
while e is not None and id(e) not in seen:
seen.add(id(e))
if excinfo_:
reprtraceback = self.repr_traceback(excinfo_)
reprcrash = (
reprcrash: Optional[ReprFileLocation] = (
excinfo_._getreprcrash() if self.style != "value" else None
) # type: Optional[ReprFileLocation]
)
else:
# Fallback to native repr if the exception doesn't have a traceback:
# ExceptionInfo objects require a full traceback to work.
@@ -936,11 +936,11 @@ class TerminalRepr:
@attr.s(eq=False)
class ExceptionRepr(TerminalRepr):
# Provided by subclasses.
reprcrash = None # type: Optional[ReprFileLocation]
reprtraceback = None # type: ReprTraceback
reprcrash: Optional["ReprFileLocation"]
reprtraceback: "ReprTraceback"
def __attrs_post_init__(self) -> None:
self.sections = [] # type: List[Tuple[str, str, str]]
self.sections: List[Tuple[str, str, str]] = []
def addsection(self, name: str, content: str, sep: str = "-") -> None:
self.sections.append((name, content, sep))
@@ -1022,7 +1022,7 @@ class ReprTracebackNative(ReprTraceback):
@attr.s(eq=False)
class ReprEntryNative(TerminalRepr):
lines = attr.ib(type=Sequence[str])
style = "native" # type: _TracebackStyle
style: "_TracebackStyle" = "native"
def toterminal(self, tw: TerminalWriter) -> None:
tw.write("".join(self.lines))
@@ -1058,9 +1058,9 @@ class ReprEntry(TerminalRepr):
# such as "> assert 0"
fail_marker = f"{FormattedExcinfo.fail_marker} "
indent_size = len(fail_marker)
indents = [] # type: List[str]
source_lines = [] # type: List[str]
failure_lines = [] # type: List[str]
indents: List[str] = []
source_lines: List[str] = []
failure_lines: List[str] = []
for index, line in enumerate(self.lines):
is_failure_line = line.startswith(fail_marker)
if is_failure_line:

View File

@@ -21,7 +21,7 @@ class Source:
def __init__(self, obj: object = None) -> None:
if not obj:
self.lines = [] # type: List[str]
self.lines: List[str] = []
elif isinstance(obj, Source):
self.lines = obj.lines
elif isinstance(obj, (tuple, list)):
@@ -144,12 +144,12 @@ def deindent(lines: Iterable[str]) -> List[str]:
def get_statement_startend2(lineno: int, node: ast.AST) -> Tuple[int, Optional[int]]:
# Flatten all statements and except handlers into one lineno-list.
# AST's line numbers start indexing at 1.
values = [] # type: List[int]
values: List[int] = []
for x in ast.walk(node):
if isinstance(x, (ast.stmt, ast.ExceptHandler)):
values.append(x.lineno - 1)
for name in ("finalbody", "orelse"):
val = getattr(x, name, None) # type: Optional[List[ast.stmt]]
val: Optional[List[ast.stmt]] = getattr(x, name, None)
if val:
# Treat the finally/orelse part as its own statement.
values.append(val[0].lineno - 1 - 1)

View File

@@ -76,7 +76,7 @@ class TerminalWriter:
self._file = file
self.hasmarkup = should_do_markup(file)
self._current_line = ""
self._terminal_width = None # type: Optional[int]
self._terminal_width: Optional[int] = None
self.code_highlight = True
@property
@@ -204,7 +204,7 @@ class TerminalWriter:
except ImportError:
return source
else:
highlighted = highlight(
highlighted: str = highlight(
source, PythonLexer(), TerminalFormatter(bg="dark")
) # type: str
)
return highlighted

View File

@@ -83,7 +83,7 @@ class AssertionState:
def __init__(self, config: Config, mode) -> None:
self.mode = mode
self.trace = config.trace.root.get("assertion")
self.hook = None # type: Optional[rewrite.AssertionRewritingHook]
self.hook: Optional[rewrite.AssertionRewritingHook] = None
def install_importhook(config: Config) -> rewrite.AssertionRewritingHook:

View File

@@ -62,14 +62,14 @@ class AssertionRewritingHook(importlib.abc.MetaPathFinder, importlib.abc.Loader)
self.fnpats = config.getini("python_files")
except ValueError:
self.fnpats = ["test_*.py", "*_test.py"]
self.session = None # type: Optional[Session]
self._rewritten_names = set() # type: Set[str]
self._must_rewrite = set() # type: Set[str]
self.session: Optional[Session] = None
self._rewritten_names: Set[str] = set()
self._must_rewrite: Set[str] = set()
# flag to guard against trying to rewrite a pyc file while we are already writing another pyc file,
# which might result in infinite recursion (#3506)
self._writing_pyc = False
self._basenames_to_check_rewrite = {"conftest"}
self._marked_for_rewrite_cache = {} # type: Dict[str, bool]
self._marked_for_rewrite_cache: Dict[str, bool] = {}
self._session_paths_checked = False
def set_session(self, session: Optional[Session]) -> None:
@@ -529,12 +529,12 @@ def set_location(node, lineno, col_offset):
def _get_assertion_exprs(src: bytes) -> Dict[int, str]:
"""Return a mapping from {lineno: "assertion test expression"}."""
ret = {} # type: Dict[int, str]
ret: Dict[int, str] = {}
depth = 0
lines = [] # type: List[str]
assert_lineno = None # type: Optional[int]
seen_lines = set() # type: Set[int]
lines: List[str] = []
assert_lineno: Optional[int] = None
seen_lines: Set[int] = set()
def _write_and_reset() -> None:
nonlocal depth, lines, assert_lineno, seen_lines
@@ -699,12 +699,12 @@ class AssertionRewriter(ast.NodeVisitor):
]
mod.body[pos:pos] = imports
# Collect asserts.
nodes = [mod] # type: List[ast.AST]
nodes: List[ast.AST] = [mod]
while nodes:
node = nodes.pop()
for name, field in ast.iter_fields(node):
if isinstance(field, list):
new = [] # type: List[ast.AST]
new: List[ast.AST] = []
for i, child in enumerate(field):
if isinstance(child, ast.Assert):
# Transform assert.
@@ -776,7 +776,7 @@ class AssertionRewriter(ast.NodeVisitor):
to format a string of %-formatted values as added by
.explanation_param().
"""
self.explanation_specifiers = {} # type: Dict[str, ast.expr]
self.explanation_specifiers: Dict[str, ast.expr] = {}
self.stack.append(self.explanation_specifiers)
def pop_format_context(self, expl_expr: ast.expr) -> ast.Name:
@@ -828,15 +828,15 @@ class AssertionRewriter(ast.NodeVisitor):
lineno=assert_.lineno,
)
self.statements = [] # type: List[ast.stmt]
self.variables = [] # type: List[str]
self.statements: List[ast.stmt] = []
self.variables: List[str] = []
self.variable_counter = itertools.count()
if self.enable_assertion_pass_hook:
self.format_variables = [] # type: List[str]
self.format_variables: List[str] = []
self.stack = [] # type: List[Dict[str, ast.expr]]
self.expl_stmts = [] # type: List[ast.stmt]
self.stack: List[Dict[str, ast.expr]] = []
self.expl_stmts: List[ast.stmt] = []
self.push_format_context()
# Rewrite assert into a bunch of statements.
top_condition, explanation = self.visit(assert_.test)
@@ -943,7 +943,7 @@ class AssertionRewriter(ast.NodeVisitor):
# Process each operand, short-circuiting if needed.
for i, v in enumerate(boolop.values):
if i:
fail_inner = [] # type: List[ast.stmt]
fail_inner: List[ast.stmt] = []
# cond is set in a prior loop iteration below
self.expl_stmts.append(ast.If(cond, fail_inner, [])) # noqa
self.expl_stmts = fail_inner
@@ -954,10 +954,10 @@ class AssertionRewriter(ast.NodeVisitor):
call = ast.Call(app, [expl_format], [])
self.expl_stmts.append(ast.Expr(call))
if i < levels:
cond = res # type: ast.expr
cond: ast.expr = res
if is_or:
cond = ast.UnaryOp(ast.Not(), cond)
inner = [] # type: List[ast.stmt]
inner: List[ast.stmt] = []
self.statements.append(ast.If(cond, inner, []))
self.statements = body = inner
self.statements = save
@@ -1053,7 +1053,7 @@ class AssertionRewriter(ast.NodeVisitor):
ast.Tuple(results, ast.Load()),
)
if len(comp.ops) > 1:
res = ast.BoolOp(ast.And(), load_names) # type: ast.expr
res: ast.expr = ast.BoolOp(ast.And(), load_names)
else:
res = load_names[0]
return res, self.explanation_param(self.pop_format_context(expl_call))

View File

@@ -21,11 +21,11 @@ from _pytest._io.saferepr import saferepr
# interpretation code and assertion rewriter to detect this plugin was
# loaded and in turn call the hooks defined here as part of the
# DebugInterpreter.
_reprcompare = None # type: Optional[Callable[[str, object, object], Optional[str]]]
_reprcompare: Optional[Callable[[str, object, object], Optional[str]]] = None
# Works similarly as _reprcompare attribute. Is populated with the hook call
# when pytest_runtest_setup is called.
_assertion_pass = None # type: Optional[Callable[[int, str, str], None]]
_assertion_pass: Optional[Callable[[int, str, str], None]] = None
def format_explanation(explanation: str) -> str:
@@ -197,7 +197,7 @@ def _diff_text(left: str, right: str, verbose: int = 0) -> List[str]:
"""
from difflib import ndiff
explanation = [] # type: List[str]
explanation: List[str] = []
if verbose < 1:
i = 0 # just in case left or right has zero length
@@ -242,7 +242,7 @@ def _compare_eq_verbose(left: Any, right: Any) -> List[str]:
left_lines = repr(left).splitlines(keepends)
right_lines = repr(right).splitlines(keepends)
explanation = [] # type: List[str]
explanation: List[str] = []
explanation += ["+" + line for line in left_lines]
explanation += ["-" + line for line in right_lines]
@@ -296,7 +296,7 @@ def _compare_eq_sequence(
left: Sequence[Any], right: Sequence[Any], verbose: int = 0
) -> List[str]:
comparing_bytes = isinstance(left, bytes) and isinstance(right, bytes)
explanation = [] # type: List[str]
explanation: List[str] = []
len_left = len(left)
len_right = len(right)
for i in range(min(len_left, len_right)):
@@ -365,7 +365,7 @@ def _compare_eq_set(
def _compare_eq_dict(
left: Mapping[Any, Any], right: Mapping[Any, Any], verbose: int = 0
) -> List[str]:
explanation = [] # type: List[str]
explanation: List[str] = []
set_left = set(left)
set_right = set(right)
common = set_left.intersection(set_right)

View File

@@ -186,7 +186,7 @@ class LFPluginCollWrapper:
def pytest_make_collect_report(self, collector: nodes.Collector):
if isinstance(collector, Session):
out = yield
res = out.get_result() # type: CollectReport
res: CollectReport = out.get_result()
# Sort any lf-paths to the beginning.
lf_paths = self.lfplugin._last_failed_paths
@@ -251,11 +251,9 @@ class LFPlugin:
active_keys = "lf", "failedfirst"
self.active = any(config.getoption(key) for key in active_keys)
assert config.cache
self.lastfailed = config.cache.get(
"cache/lastfailed", {}
) # type: Dict[str, bool]
self._previously_failed_count = None # type: Optional[int]
self._report_status = None # type: Optional[str]
self.lastfailed: Dict[str, bool] = config.cache.get("cache/lastfailed", {})
self._previously_failed_count: Optional[int] = None
self._report_status: Optional[str] = None
self._skipped_files = 0 # count skipped files during collection due to --lf
if config.getoption("lf"):
@@ -369,8 +367,8 @@ class NFPlugin:
yield
if self.active:
new_items = order_preserving_dict() # type: Dict[str, nodes.Item]
other_items = order_preserving_dict() # type: Dict[str, nodes.Item]
new_items: Dict[str, nodes.Item] = order_preserving_dict()
other_items: Dict[str, nodes.Item] = order_preserving_dict()
for item in items:
if item.nodeid not in self.cached_nodeids:
new_items[item.nodeid] = item

View File

@@ -369,9 +369,7 @@ class FDCaptureBinary:
# Further complications are the need to support suspend() and the
# possibility of FD reuse (e.g. the tmpfile getting the very same
# target FD). The following approach is robust, I believe.
self.targetfd_invalid = os.open(
os.devnull, os.O_RDWR
) # type: Optional[int]
self.targetfd_invalid: Optional[int] = os.open(os.devnull, os.O_RDWR)
os.dup2(self.targetfd_invalid, targetfd)
else:
self.targetfd_invalid = None
@@ -505,8 +503,8 @@ class CaptureResult(Generic[AnyStr]):
__slots__ = ("out", "err")
def __init__(self, out: AnyStr, err: AnyStr) -> None:
self.out = out # type: AnyStr
self.err = err # type: AnyStr
self.out: AnyStr = out
self.err: AnyStr = err
def __len__(self) -> int:
return 2
@@ -665,8 +663,8 @@ class CaptureManager:
def __init__(self, method: "_CaptureMethod") -> None:
self._method = method
self._global_capturing = None # type: Optional[MultiCapture[str]]
self._capture_fixture = None # type: Optional[CaptureFixture[Any]]
self._global_capturing: Optional[MultiCapture[str]] = None
self._capture_fixture: Optional[CaptureFixture[Any]] = None
def __repr__(self) -> str:
return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
@@ -835,7 +833,7 @@ class CaptureFixture(Generic[AnyStr]):
def __init__(self, captureclass, request: SubRequest) -> None:
self.captureclass = captureclass
self.request = request
self._capture = None # type: Optional[MultiCapture[AnyStr]]
self._capture: Optional[MultiCapture[AnyStr]] = None
self._captured_out = self.captureclass.EMPTY_BUFFER
self._captured_err = self.captureclass.EMPTY_BUFFER

View File

@@ -36,7 +36,7 @@ _S = TypeVar("_S")
# https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions
class NotSetType(enum.Enum):
token = 0
NOTSET = NotSetType.token # type: Final # noqa: E305
NOTSET: "Final" = NotSetType.token # noqa: E305
# fmt: on
if sys.version_info >= (3, 8):

View File

@@ -159,9 +159,9 @@ def main(
return ExitCode.USAGE_ERROR
else:
try:
ret = config.hook.pytest_cmdline_main(
ret: Union[ExitCode, int] = config.hook.pytest_cmdline_main(
config=config
) # type: Union[ExitCode, int]
)
try:
return ExitCode(ret)
except ValueError:
@@ -337,27 +337,27 @@ class PytestPluginManager(PluginManager):
super().__init__("pytest")
# The objects are module objects, only used generically.
self._conftest_plugins = set() # type: Set[types.ModuleType]
self._conftest_plugins: Set[types.ModuleType] = set()
# State related to local conftest plugins.
self._dirpath2confmods = {} # type: Dict[py.path.local, List[types.ModuleType]]
self._conftestpath2mod = {} # type: Dict[Path, types.ModuleType]
self._confcutdir = None # type: Optional[py.path.local]
self._dirpath2confmods: Dict[py.path.local, List[types.ModuleType]] = {}
self._conftestpath2mod: Dict[Path, types.ModuleType] = {}
self._confcutdir: Optional[py.path.local] = None
self._noconftest = False
self._duplicatepaths = set() # type: Set[py.path.local]
self._duplicatepaths: Set[py.path.local] = set()
# plugins that were explicitly skipped with pytest.skip
# list of (module name, skip reason)
# previously we would issue a warning when a plugin was skipped, but
# since we refactored warnings as first citizens of Config, they are
# just stored here to be used later.
self.skipped_plugins = [] # type: List[Tuple[str, str]]
self.skipped_plugins: List[Tuple[str, str]] = []
self.add_hookspecs(_pytest.hookspec)
self.register(self)
if os.environ.get("PYTEST_DEBUG"):
err = sys.stderr # type: IO[str]
encoding = getattr(err, "encoding", "utf8") # type: str
err: IO[str] = sys.stderr
encoding: str = getattr(err, "encoding", "utf8")
try:
err = open(
os.dup(err.fileno()), mode=err.mode, buffering=1, encoding=encoding,
@@ -431,7 +431,7 @@ class PytestPluginManager(PluginManager):
)
)
return None
ret = super().register(plugin, name) # type: Optional[str]
ret: Optional[str] = super().register(plugin, name)
if ret:
self.hook.pytest_plugin_registered.call_historic(
kwargs=dict(plugin=plugin, manager=self)
@@ -443,7 +443,7 @@ class PytestPluginManager(PluginManager):
def getplugin(self, name: str):
# Support deprecated naming because plugins (xdist e.g.) use it.
plugin = self.get_plugin(name) # type: Optional[_PluggyPlugin]
plugin: Optional[_PluggyPlugin] = self.get_plugin(name)
return plugin
def hasplugin(self, name: str) -> bool:
@@ -898,10 +898,10 @@ class Config:
self.trace = self.pluginmanager.trace.root.get("config")
self.hook = self.pluginmanager.hook
self._inicache = {} # type: Dict[str, Any]
self._override_ini = () # type: Sequence[str]
self._opt2dest = {} # type: Dict[str, str]
self._cleanup = [] # type: List[Callable[[], None]]
self._inicache: Dict[str, Any] = {}
self._override_ini: Sequence[str] = ()
self._opt2dest: Dict[str, str] = {}
self._cleanup: List[Callable[[], None]] = []
# A place where plugins can store information on the config for their
# own use. Currently only intended for internal plugins.
self._store = Store()
@@ -914,7 +914,7 @@ class Config:
if TYPE_CHECKING:
from _pytest.cacheprovider import Cache
self.cache = None # type: Optional[Cache]
self.cache: Optional[Cache] = None
@property
def invocation_dir(self) -> py.path.local:
@@ -989,9 +989,9 @@ class Config:
fin()
def get_terminal_writer(self) -> TerminalWriter:
terminalreporter = self.pluginmanager.get_plugin(
terminalreporter: TerminalReporter = self.pluginmanager.get_plugin(
"terminalreporter"
) # type: TerminalReporter
)
return terminalreporter._tw
def pytest_cmdline_parse(
@@ -1026,7 +1026,7 @@ class Config:
option: Optional[argparse.Namespace] = None,
) -> None:
if option and getattr(option, "fulltrace", False):
style = "long" # type: _TracebackStyle
style: _TracebackStyle = "long"
else:
style = "native"
excrepr = excinfo.getrepr(
@@ -1415,7 +1415,7 @@ class Config:
except KeyError:
return None
modpath = py.path.local(mod.__file__).dirpath()
values = [] # type: List[py.path.local]
values: List[py.path.local] = []
for relroot in relroots:
if not isinstance(relroot, py.path.local):
relroot = relroot.replace("/", os.sep)
@@ -1568,7 +1568,7 @@ def parse_warning_filter(
while len(parts) < 5:
parts.append("")
action_, message, category_, module, lineno_ = [s.strip() for s in parts]
action = warnings._getaction(action_) # type: str # type: ignore[attr-defined]
action: str = warnings._getaction(action_) # type: ignore[attr-defined]
category: Type[Warning] = warnings._getcategory(category_) # type: ignore[attr-defined]
if message and escape:
message = re.escape(message)

View File

@@ -35,7 +35,7 @@ class Parser:
there's an error processing the command line arguments.
"""
prog = None # type: Optional[str]
prog: Optional[str] = None
def __init__(
self,
@@ -43,12 +43,12 @@ class Parser:
processopt: Optional[Callable[["Argument"], None]] = None,
) -> None:
self._anonymous = OptionGroup("custom options", parser=self)
self._groups = [] # type: List[OptionGroup]
self._groups: List[OptionGroup] = []
self._processopt = processopt
self._usage = usage
self._inidict = {} # type: Dict[str, Tuple[str, Optional[str], Any]]
self._ininames = [] # type: List[str]
self.extra_info = {} # type: Dict[str, Any]
self._inidict: Dict[str, Tuple[str, Optional[str], Any]] = {}
self._ininames: List[str] = []
self.extra_info: Dict[str, Any] = {}
def processoption(self, option: "Argument") -> None:
if self._processopt:
@@ -207,8 +207,8 @@ class Argument:
def __init__(self, *names: str, **attrs: Any) -> None:
"""Store parms in private vars for use in add_argument."""
self._attrs = attrs
self._short_opts = [] # type: List[str]
self._long_opts = [] # type: List[str]
self._short_opts: List[str] = []
self._long_opts: List[str] = []
if "%default" in (attrs.get("help") or ""):
warnings.warn(
'pytest now uses argparse. "%default" should be'
@@ -254,7 +254,7 @@ class Argument:
except KeyError:
pass
self._set_opt_strings(names)
dest = attrs.get("dest") # type: Optional[str]
dest: Optional[str] = attrs.get("dest")
if dest:
self.dest = dest
elif self._long_opts:
@@ -315,7 +315,7 @@ class Argument:
self._long_opts.append(opt)
def __repr__(self) -> str:
args = [] # type: List[str]
args: List[str] = []
if self._short_opts:
args += ["_short_opts: " + repr(self._short_opts)]
if self._long_opts:
@@ -334,7 +334,7 @@ class OptionGroup:
) -> None:
self.name = name
self.description = description
self.options = [] # type: List[Argument]
self.options: List[Argument] = []
self.parser = parser
def addoption(self, *optnames: str, **attrs: Any) -> None:
@@ -472,9 +472,7 @@ class DropShorterLongHelpFormatter(argparse.HelpFormatter):
orgstr = argparse.HelpFormatter._format_action_invocation(self, action)
if orgstr and orgstr[0] != "-": # only optional arguments
return orgstr
res = getattr(
action, "_formatted_action_invocation", None
) # type: Optional[str]
res: Optional[str] = getattr(action, "_formatted_action_invocation", None)
if res:
return res
options = orgstr.split(", ")
@@ -483,7 +481,7 @@ class DropShorterLongHelpFormatter(argparse.HelpFormatter):
action._formatted_action_invocation = orgstr # type: ignore
return orgstr
return_list = []
short_long = {} # type: Dict[str, str]
short_long: Dict[str, str] = {}
for option in options:
if len(option) == 2 or option[2] == " ":
continue

View File

@@ -110,7 +110,7 @@ def locate_config(
def get_common_ancestor(paths: Iterable[Path]) -> Path:
common_ancestor = None # type: Optional[Path]
common_ancestor: Optional[Path] = None
for path in paths:
if not path.exists():
continue
@@ -175,7 +175,7 @@ def determine_setup(
dirs = get_dirs_from_args(args)
if inifile:
inipath_ = absolutepath(inifile)
inipath = inipath_ # type: Optional[Path]
inipath: Optional[Path] = inipath_
inicfg = load_config_dict_from_file(inipath_) or {}
if rootdir_cmd_arg is None:
rootdir = get_common_ancestor(dirs)

View File

@@ -94,13 +94,13 @@ def pytest_configure(config: Config) -> None:
class pytestPDB:
"""Pseudo PDB that defers to the real pdb."""
_pluginmanager = None # type: Optional[PytestPluginManager]
_config = None # type: Config
_saved = (
[]
) # type: List[Tuple[Callable[..., None], Optional[PytestPluginManager], Config]]
_pluginmanager: Optional[PytestPluginManager] = None
_config: Optional[Config] = None
_saved: List[
Tuple[Callable[..., None], Optional[PytestPluginManager], Optional[Config]]
] = []
_recursive_debug = 0
_wrapped_pdb_cls = None # type: Optional[Tuple[Type[Any], Type[Any]]]
_wrapped_pdb_cls: Optional[Tuple[Type[Any], Type[Any]]] = None
@classmethod
def _is_capturing(cls, capman: Optional["CaptureManager"]) -> Union[str, bool]:
@@ -166,6 +166,7 @@ class pytestPDB:
def do_continue(self, arg):
ret = super().do_continue(arg)
if cls._recursive_debug == 0:
assert cls._config is not None
tw = _pytest.config.create_terminal_writer(cls._config)
tw.line()
@@ -239,7 +240,7 @@ class pytestPDB:
import _pytest.config
if cls._pluginmanager is None:
capman = None # type: Optional[CaptureManager]
capman: Optional[CaptureManager] = None
else:
capman = cls._pluginmanager.getplugin("capturemanager")
if capman:

View File

@@ -59,7 +59,7 @@ DOCTEST_REPORT_CHOICES = (
# Lazy definition of runner class
RUNNER_CLASS = None
# Lazy definition of output checker class
CHECKER_CLASS = None # type: Optional[Type[doctest.OutputChecker]]
CHECKER_CLASS: Optional[Type["doctest.OutputChecker"]] = None
def pytest_addoption(parser: Parser) -> None:
@@ -124,10 +124,10 @@ def pytest_collect_file(
config = parent.config
if path.ext == ".py":
if config.option.doctestmodules and not _is_setup_py(path):
mod = DoctestModule.from_parent(parent, fspath=path) # type: DoctestModule
mod: DoctestModule = DoctestModule.from_parent(parent, fspath=path)
return mod
elif _is_doctest(config, path, parent):
txt = DoctestTextfile.from_parent(parent, fspath=path) # type: DoctestTextfile
txt: DoctestTextfile = DoctestTextfile.from_parent(parent, fspath=path)
return txt
return None
@@ -163,7 +163,7 @@ class ReprFailDoctest(TerminalRepr):
class MultipleDoctestFailures(Exception):
def __init__(self, failures: "Sequence[doctest.DocTestFailure]") -> None:
def __init__(self, failures: Sequence["doctest.DocTestFailure"]) -> None:
super().__init__()
self.failures = failures
@@ -180,7 +180,7 @@ def _init_runner_class() -> Type["doctest.DocTestRunner"]:
def __init__(
self,
checker: Optional[doctest.OutputChecker] = None,
checker: Optional["doctest.OutputChecker"] = None,
verbose: Optional[bool] = None,
optionflags: int = 0,
continue_on_failure: bool = True,
@@ -251,7 +251,7 @@ class DoctestItem(pytest.Item):
self.runner = runner
self.dtest = dtest
self.obj = None
self.fixture_request = None # type: Optional[FixtureRequest]
self.fixture_request: Optional[FixtureRequest] = None
@classmethod
def from_parent( # type: ignore
@@ -281,7 +281,7 @@ class DoctestItem(pytest.Item):
assert self.runner is not None
_check_all_skipped(self.dtest)
self._disable_output_capturing_for_darwin()
failures = [] # type: List[doctest.DocTestFailure]
failures: List["doctest.DocTestFailure"] = []
# Type ignored because we change the type of `out` from what
# doctest expects.
self.runner.run(self.dtest, out=failures) # type: ignore[arg-type]
@@ -305,9 +305,9 @@ class DoctestItem(pytest.Item):
) -> Union[str, TerminalRepr]:
import doctest
failures = (
None
) # type: Optional[Sequence[Union[doctest.DocTestFailure, doctest.UnexpectedException]]]
failures: Optional[
Sequence[Union[doctest.DocTestFailure, doctest.UnexpectedException]]
] = (None)
if isinstance(
excinfo.value, (doctest.DocTestFailure, doctest.UnexpectedException)
):
@@ -636,8 +636,8 @@ def _init_checker_class() -> Type["doctest.OutputChecker"]:
return got
offset = 0
for w, g in zip(wants, gots):
fraction = w.group("fraction") # type: Optional[str]
exponent = w.group("exponent1") # type: Optional[str]
fraction: Optional[str] = w.group("fraction")
exponent: Optional[str] = w.group("exponent1")
if exponent is None:
exponent = w.group("exponent2")
if fraction is None:

View File

@@ -160,8 +160,8 @@ def add_funcarg_pseudo_fixture_def(
# This function call does not have direct parametrization.
return
# Collect funcargs of all callspecs into a list of values.
arg2params = {} # type: Dict[str, List[object]]
arg2scope = {} # type: Dict[str, _Scope]
arg2params: Dict[str, List[object]] = {}
arg2scope: Dict[str, _Scope] = {}
for callspec in metafunc._calls:
for argname, argvalue in callspec.funcargs.items():
assert argname not in callspec.params
@@ -219,9 +219,9 @@ def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
"""Return fixturemarker or None if it doesn't exist or raised
exceptions."""
try:
fixturemarker = getattr(
fixturemarker: Optional[FixtureFunctionMarker] = getattr(
obj, "_pytestfixturefunction", None
) # type: Optional[FixtureFunctionMarker]
)
except TEST_OUTCOME:
# some objects raise errors like request (from flask import request)
# we don't expect them to be fixture functions
@@ -242,7 +242,7 @@ def get_parametrized_fixture_keys(item: nodes.Item, scopenum: int) -> Iterator[_
except AttributeError:
pass
else:
cs = callspec # type: CallSpec2
cs: CallSpec2 = callspec
# cs.indices.items() is random order of argnames. Need to
# sort this so that different calls to
# get_parametrized_fixture_keys will be deterministic.
@@ -250,7 +250,7 @@ def get_parametrized_fixture_keys(item: nodes.Item, scopenum: int) -> Iterator[_
if cs._arg2scopenum[argname] != scopenum:
continue
if scopenum == 0: # session
key = (argname, param_index) # type: _Key
key: _Key = (argname, param_index)
elif scopenum == 1: # package
key = (argname, param_index, item.fspath.dirpath())
elif scopenum == 2: # module
@@ -268,12 +268,12 @@ def get_parametrized_fixture_keys(item: nodes.Item, scopenum: int) -> Iterator[_
def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
argkeys_cache = {} # type: Dict[int, Dict[nodes.Item, Dict[_Key, None]]]
items_by_argkey = {} # type: Dict[int, Dict[_Key, Deque[nodes.Item]]]
argkeys_cache: Dict[int, Dict[nodes.Item, Dict[_Key, None]]] = {}
items_by_argkey: Dict[int, Dict[_Key, Deque[nodes.Item]]] = {}
for scopenum in range(0, scopenum_function):
d = {} # type: Dict[nodes.Item, Dict[_Key, None]]
d: Dict[nodes.Item, Dict[_Key, None]] = {}
argkeys_cache[scopenum] = d
item_d = defaultdict(deque) # type: Dict[_Key, Deque[nodes.Item]]
item_d: Dict[_Key, Deque[nodes.Item]] = defaultdict(deque)
items_by_argkey[scopenum] = item_d
for item in items:
# cast is a workaround for https://github.com/python/typeshed/issues/3800.
@@ -312,13 +312,13 @@ def reorder_items_atscope(
) -> Dict[nodes.Item, None]:
if scopenum >= scopenum_function or len(items) < 3:
return items
ignore = set() # type: Set[Optional[_Key]]
ignore: Set[Optional[_Key]] = set()
items_deque = deque(items)
items_done = order_preserving_dict() # type: Dict[nodes.Item, None]
items_done: Dict[nodes.Item, None] = order_preserving_dict()
scoped_items_by_argkey = items_by_argkey[scopenum]
scoped_argkeys_cache = argkeys_cache[scopenum]
while items_deque:
no_argkey_group = order_preserving_dict() # type: Dict[nodes.Item, None]
no_argkey_group: Dict[nodes.Item, None] = order_preserving_dict()
slicing_argkey = None
while items_deque:
item = items_deque.popleft()
@@ -400,7 +400,7 @@ class FuncFixtureInfo:
tree. In this way the dependency tree can get pruned, and the closure
of argnames may get reduced.
"""
closure = set() # type: Set[str]
closure: Set[str] = set()
working_set = set(self.initialnames)
while working_set:
argname = working_set.pop()
@@ -428,16 +428,14 @@ class FixtureRequest:
def __init__(self, pyfuncitem) -> None:
self._pyfuncitem = pyfuncitem
#: Fixture for which this request is being performed.
self.fixturename = None # type: Optional[str]
self.fixturename: Optional[str] = None
#: Scope string, one of "function", "class", "module", "session".
self.scope = "function" # type: _Scope
self._fixture_defs = {} # type: Dict[str, FixtureDef[Any]]
fixtureinfo = pyfuncitem._fixtureinfo # type: FuncFixtureInfo
self.scope: _Scope = "function"
self._fixture_defs: Dict[str, FixtureDef[Any]] = {}
fixtureinfo: FuncFixtureInfo = pyfuncitem._fixtureinfo
self._arg2fixturedefs = fixtureinfo.name2fixturedefs.copy()
self._arg2index = {} # type: Dict[str, int]
self._fixturemanager = (
pyfuncitem.session._fixturemanager
) # type: FixtureManager
self._arg2index: Dict[str, int] = {}
self._fixturemanager: FixtureManager = (pyfuncitem.session._fixturemanager)
@property
def fixturenames(self) -> List[str]:
@@ -589,7 +587,7 @@ class FixtureRequest:
except FixtureLookupError:
if argname == "request":
cached_result = (self, [0], None)
scope = "function" # type: _Scope
scope: _Scope = "function"
return PseudoFixtureDef(cached_result, scope)
raise
# Remove indent to prevent the python3 exception
@@ -600,7 +598,7 @@ class FixtureRequest:
def _get_fixturestack(self) -> List["FixtureDef[Any]"]:
current = self
values = [] # type: List[FixtureDef[Any]]
values: List[FixtureDef[Any]] = []
while 1:
fixturedef = getattr(current, "_fixturedef", None)
if fixturedef is None:
@@ -782,7 +780,7 @@ class SubRequest(FixtureRequest):
super()._schedule_finalizers(fixturedef, subrequest)
scopes = ["session", "package", "module", "class", "function"] # type: List[_Scope]
scopes: List["_Scope"] = ["session", "package", "module", "class", "function"]
scopenum_function = scopes.index("function")
@@ -793,7 +791,7 @@ def scopemismatch(currentscope: "_Scope", newscope: "_Scope") -> bool:
def scope2index(scope: str, descr: str, where: Optional[str] = None) -> int:
"""Look up the index of ``scope`` and raise a descriptive value error
if not defined."""
strscopes = scopes # type: Sequence[str]
strscopes: Sequence[str] = scopes
try:
return strscopes.index(scope)
except ValueError:
@@ -818,7 +816,7 @@ class FixtureLookupError(LookupError):
self.msg = msg
def formatrepr(self) -> "FixtureLookupErrorRepr":
tblines = [] # type: List[str]
tblines: List[str] = []
addline = tblines.append
stack = [self.request._pyfuncitem.obj]
stack.extend(map(lambda x: x.func, self.fixturestack))
@@ -995,14 +993,14 @@ class FixtureDef(Generic[_FixtureValue]):
where=baseid,
)
self.scope = scope_
self.params = params # type: Optional[Sequence[object]]
self.argnames = getfuncargnames(
self.params: Optional[Sequence[object]] = params
self.argnames: Tuple[str, ...] = getfuncargnames(
func, name=argname, is_method=unittest
) # type: Tuple[str, ...]
)
self.unittest = unittest
self.ids = ids
self.cached_result = None # type: Optional[_FixtureCachedResult[_FixtureValue]]
self._finalizers = [] # type: List[Callable[[], object]]
self.cached_result: Optional[_FixtureCachedResult[_FixtureValue]] = None
self._finalizers: List[Callable[[], object]] = []
def addfinalizer(self, finalizer: Callable[[], object]) -> None:
self._finalizers.append(finalizer)
@@ -1408,12 +1406,12 @@ class FixtureManager:
def __init__(self, session: "Session") -> None:
self.session = session
self.config = session.config # type: Config
self._arg2fixturedefs = {} # type: Dict[str, List[FixtureDef[Any]]]
self._holderobjseen = set() # type: Set[object]
self._nodeid_and_autousenames = [
self.config: Config = session.config
self._arg2fixturedefs: Dict[str, List[FixtureDef[Any]]] = {}
self._holderobjseen: Set[object] = set()
self._nodeid_and_autousenames: List[Tuple[str, List[str]]] = [
("", self.config.getini("usefixtures"))
] # type: List[Tuple[str, List[str]]]
]
session.config.pluginmanager.register(self, "funcmanage")
def _get_direct_parametrize_args(self, node: nodes.Node) -> List[str]:
@@ -1425,7 +1423,7 @@ class FixtureManager:
These things are done later as well when dealing with parametrization
so this could be improved.
"""
parametrize_argnames = [] # type: List[str]
parametrize_argnames: List[str] = []
for marker in node.iter_markers(name="parametrize"):
if not marker.kwargs.get("indirect", False):
p_argnames, _ = ParameterSet._parse_parametrize_args(
@@ -1477,7 +1475,7 @@ class FixtureManager:
def _getautousenames(self, nodeid: str) -> List[str]:
"""Return a list of fixture names to be used."""
autousenames = [] # type: List[str]
autousenames: List[str] = []
for baseid, basenames in self._nodeid_and_autousenames:
if nodeid.startswith(baseid):
if baseid:
@@ -1516,7 +1514,7 @@ class FixtureManager:
# need to return it as well, so save this.
initialnames = tuple(fixturenames_closure)
arg2fixturedefs = {} # type: Dict[str, Sequence[FixtureDef[Any]]]
arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {}
lastlen = -1
while lastlen != len(fixturenames_closure):
lastlen = len(fixturenames_closure)

View File

@@ -97,7 +97,7 @@ def pytest_addoption(parser: Parser) -> None:
@pytest.hookimpl(hookwrapper=True)
def pytest_cmdline_parse():
outcome = yield
config = outcome.get_result() # type: Config
config: Config = outcome.get_result()
if config.option.debug:
path = os.path.abspath("pytestdebug.log")
debugfile = open(path, "w")

View File

@@ -93,9 +93,9 @@ class _NodeReporter:
self.add_stats = self.xml.add_stats
self.family = self.xml.family
self.duration = 0
self.properties = [] # type: List[Tuple[str, str]]
self.nodes = [] # type: List[ET.Element]
self.attrs = {} # type: Dict[str, str]
self.properties: List[Tuple[str, str]] = []
self.nodes: List[ET.Element] = []
self.attrs: Dict[str, str] = {}
def append(self, node: ET.Element) -> None:
self.xml.add_stats(node.tag)
@@ -122,11 +122,11 @@ class _NodeReporter:
classnames = names[:-1]
if self.xml.prefix:
classnames.insert(0, self.xml.prefix)
attrs = {
attrs: Dict[str, str] = {
"classname": ".".join(classnames),
"name": bin_xml_escape(names[-1]),
"file": testreport.location[0],
} # type: Dict[str, str]
}
if testreport.location[1] is not None:
attrs["line"] = str(testreport.location[1])
if hasattr(testreport, "url"):
@@ -199,9 +199,9 @@ class _NodeReporter:
self._add_simple("skipped", "xfail-marked test passes unexpectedly")
else:
assert report.longrepr is not None
reprcrash = getattr(
reprcrash: Optional[ReprFileLocation] = getattr(
report.longrepr, "reprcrash", None
) # type: Optional[ReprFileLocation]
)
if reprcrash is not None:
message = reprcrash.message
else:
@@ -219,9 +219,9 @@ class _NodeReporter:
def append_error(self, report: TestReport) -> None:
assert report.longrepr is not None
reprcrash = getattr(
reprcrash: Optional[ReprFileLocation] = getattr(
report.longrepr, "reprcrash", None
) # type: Optional[ReprFileLocation]
)
if reprcrash is not None:
reason = reprcrash.message
else:
@@ -481,17 +481,17 @@ class LogXML:
self.log_passing_tests = log_passing_tests
self.report_duration = report_duration
self.family = family
self.stats = dict.fromkeys(
self.stats: Dict[str, int] = dict.fromkeys(
["error", "passed", "failure", "skipped"], 0
) # type: Dict[str, int]
self.node_reporters = (
{}
) # type: Dict[Tuple[Union[str, TestReport], object], _NodeReporter]
self.node_reporters_ordered = [] # type: List[_NodeReporter]
self.global_properties = [] # type: List[Tuple[str, str]]
)
self.node_reporters: Dict[
Tuple[Union[str, TestReport], object], _NodeReporter
] = ({})
self.node_reporters_ordered: List[_NodeReporter] = []
self.global_properties: List[Tuple[str, str]] = []
# List of reports that failed on call but teardown is pending.
self.open_reports = [] # type: List[TestReport]
self.open_reports: List[TestReport] = []
self.cnt_double_fail_tests = 0
# Replaces convenience family with real family.
@@ -507,7 +507,7 @@ class LogXML:
reporter.finalize()
def node_reporter(self, report: Union[TestReport, str]) -> _NodeReporter:
nodeid = getattr(report, "nodeid", report) # type: Union[str, TestReport]
nodeid: Union[str, TestReport] = getattr(report, "nodeid", report)
# Local hack to handle xdist report order.
workernode = getattr(report, "node", None)

View File

@@ -47,7 +47,7 @@ class ColoredLevelFormatter(logging.Formatter):
"""A logging formatter which colorizes the %(levelname)..s part of the
log format passed to __init__."""
LOGLEVEL_COLOROPTS = {
LOGLEVEL_COLOROPTS: Mapping[int, AbstractSet[str]] = {
logging.CRITICAL: {"red"},
logging.ERROR: {"red", "bold"},
logging.WARNING: {"yellow"},
@@ -55,13 +55,13 @@ class ColoredLevelFormatter(logging.Formatter):
logging.INFO: {"green"},
logging.DEBUG: {"purple"},
logging.NOTSET: set(),
} # type: Mapping[int, AbstractSet[str]]
}
LEVELNAME_FMT_REGEX = re.compile(r"%\(levelname\)([+-.]?\d*s)")
def __init__(self, terminalwriter: TerminalWriter, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._original_fmt = self._style._fmt
self._level_to_fmt_mapping = {} # type: Dict[int, str]
self._level_to_fmt_mapping: Dict[int, str] = {}
assert self._fmt is not None
levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt)
@@ -315,12 +315,12 @@ class catching_logs:
class LogCaptureHandler(logging.StreamHandler):
"""A logging handler that stores log records and the log text."""
stream = None # type: StringIO
stream: StringIO
def __init__(self) -> None:
"""Create a new log handler."""
super().__init__(StringIO())
self.records = [] # type: List[logging.LogRecord]
self.records: List[logging.LogRecord] = []
def emit(self, record: logging.LogRecord) -> None:
"""Keep the log records in a list in addition to the log text."""
@@ -346,9 +346,9 @@ class LogCaptureFixture:
def __init__(self, item: nodes.Node) -> None:
self._item = item
self._initial_handler_level = None # type: Optional[int]
self._initial_handler_level: Optional[int] = None
# Dict of log name -> log level.
self._initial_logger_levels = {} # type: Dict[Optional[str], int]
self._initial_logger_levels: Dict[Optional[str], int] = {}
def _finalize(self) -> None:
"""Finalize the fixture.
@@ -564,9 +564,9 @@ class LoggingPlugin:
terminal_reporter = config.pluginmanager.get_plugin("terminalreporter")
capture_manager = config.pluginmanager.get_plugin("capturemanager")
# if capturemanager plugin is disabled, live logging still works.
self.log_cli_handler = _LiveLoggingStreamHandler(
terminal_reporter, capture_manager
) # type: Union[_LiveLoggingStreamHandler, _LiveLoggingNullHandler]
self.log_cli_handler: Union[
_LiveLoggingStreamHandler, _LiveLoggingNullHandler
] = _LiveLoggingStreamHandler(terminal_reporter, capture_manager)
else:
self.log_cli_handler = _LiveLoggingNullHandler()
log_cli_formatter = self._create_formatter(
@@ -582,9 +582,9 @@ class LoggingPlugin:
if color != "no" and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search(
log_format
):
formatter = ColoredLevelFormatter(
formatter: logging.Formatter = ColoredLevelFormatter(
create_terminal_writer(self._config), log_format, log_date_format
) # type: logging.Formatter
)
else:
formatter = logging.Formatter(log_format, log_date_format)
@@ -699,7 +699,7 @@ class LoggingPlugin:
def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None, None, None]:
self.log_cli_handler.set_when("setup")
empty = {} # type: Dict[str, List[logging.LogRecord]]
empty: Dict[str, List[logging.LogRecord]] = {}
item._store[caplog_records_key] = empty
yield from self._runtest_for(item, "setup")
@@ -755,7 +755,7 @@ class _LiveLoggingStreamHandler(logging.StreamHandler):
# Officially stream needs to be a IO[str], but TerminalReporter
# isn't. So force it.
stream = None # type: TerminalReporter # type: ignore
stream: TerminalReporter = None # type: ignore
def __init__(
self,

View File

@@ -262,7 +262,7 @@ def wrap_session(
session.exitstatus = ExitCode.TESTS_FAILED
except (KeyboardInterrupt, exit.Exception):
excinfo = _pytest._code.ExceptionInfo.from_current()
exitstatus = ExitCode.INTERRUPTED # type: Union[int, ExitCode]
exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED
if isinstance(excinfo.value, exit.Exception):
if excinfo.value.returncode is not None:
exitstatus = excinfo.value.returncode
@@ -439,10 +439,10 @@ class Session(nodes.FSCollector):
Interrupted = Interrupted
Failed = Failed
# Set on the session by runner.pytest_sessionstart.
_setupstate = None # type: SetupState
_setupstate: SetupState
# Set on the session by fixtures.pytest_sessionstart.
_fixturemanager = None # type: FixtureManager
exitstatus = None # type: Union[int, ExitCode]
_fixturemanager: FixtureManager
exitstatus: Union[int, ExitCode]
def __init__(self, config: Config) -> None:
super().__init__(
@@ -450,21 +450,19 @@ class Session(nodes.FSCollector):
)
self.testsfailed = 0
self.testscollected = 0
self.shouldstop = False # type: Union[bool, str]
self.shouldfail = False # type: Union[bool, str]
self.shouldstop: Union[bool, str] = False
self.shouldfail: Union[bool, str] = False
self.trace = config.trace.root.get("collection")
self.startdir = config.invocation_dir
self._initialpaths = frozenset() # type: FrozenSet[py.path.local]
self._initialpaths: FrozenSet[py.path.local] = frozenset()
self._bestrelpathcache = _bestrelpath_cache(
config.rootpath
) # type: Dict[Path, str]
self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath)
self.config.pluginmanager.register(self, name="session")
@classmethod
def from_config(cls, config: Config) -> "Session":
session = cls._create(config) # type: Session
session: Session = cls._create(config)
return session
def __repr__(self) -> str:
@@ -589,15 +587,15 @@ class Session(nodes.FSCollector):
self.trace("perform_collect", self, args)
self.trace.root.indent += 1
self._notfound = [] # type: List[Tuple[str, Sequence[nodes.Collector]]]
self._initial_parts = [] # type: List[Tuple[py.path.local, List[str]]]
self.items = [] # type: List[nodes.Item]
self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
self._initial_parts: List[Tuple[py.path.local, List[str]]] = []
self.items: List[nodes.Item] = []
hook = self.config.hook
items = self.items # type: Sequence[Union[nodes.Item, nodes.Collector]]
items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items
try:
initialpaths = [] # type: List[py.path.local]
initialpaths: List[py.path.local] = []
for arg in args:
fspath, parts = resolve_collection_argument(
self.config.invocation_params.dir,
@@ -637,19 +635,17 @@ class Session(nodes.FSCollector):
from _pytest.python import Package
# Keep track of any collected nodes in here, so we don't duplicate fixtures.
node_cache1 = {} # type: Dict[py.path.local, Sequence[nodes.Collector]]
node_cache2 = (
{}
) # type: Dict[Tuple[Type[nodes.Collector], py.path.local], nodes.Collector]
node_cache1: Dict[py.path.local, Sequence[nodes.Collector]] = {}
node_cache2: Dict[
Tuple[Type[nodes.Collector], py.path.local], nodes.Collector
] = ({})
# Keep track of any collected collectors in matchnodes paths, so they
# are not collected more than once.
matchnodes_cache = (
{}
) # type: Dict[Tuple[Type[nodes.Collector], str], CollectReport]
matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = ({})
# Dirnames of pkgs with dunder-init files.
pkg_roots = {} # type: Dict[str, Package]
pkg_roots: Dict[str, Package] = {}
for argpath, names in self._initial_parts:
self.trace("processing argument", (argpath, names))
@@ -678,7 +674,7 @@ class Session(nodes.FSCollector):
if argpath.check(dir=1):
assert not names, "invalid arg {!r}".format((argpath, names))
seen_dirs = set() # type: Set[py.path.local]
seen_dirs: Set[py.path.local] = set()
for direntry in visit(str(argpath), self._recurse):
if not direntry.is_file():
continue
@@ -718,9 +714,9 @@ class Session(nodes.FSCollector):
node_cache1[argpath] = col
matching = []
work = [
(col, names)
] # type: List[Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]]
work: List[
Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]
] = [(col, names)]
while work:
self.trace("matchnodes", col, names)
self.trace.root.indent += 1

View File

@@ -133,7 +133,7 @@ IDENT_PREFIX = "$"
def expression(s: Scanner) -> ast.Expression:
if s.accept(TokenType.EOF):
ret = ast.NameConstant(False) # type: ast.expr
ret: ast.expr = ast.NameConstant(False)
else:
ret = expr(s)
s.accept(TokenType.EOF, reject=True)
@@ -203,9 +203,9 @@ class Expression:
:param input: The input expression - one line.
"""
astexpr = expression(Scanner(input))
code = compile(
code: types.CodeType = compile(
astexpr, filename="<pytest match expression>", mode="eval",
) # type: types.CodeType
)
return Expression(code)
def evaluate(self, matcher: Callable[[str], bool]) -> bool:
@@ -217,7 +217,5 @@ class Expression:
:returns: Whether the expression matches or not.
"""
ret = eval(
self.code, {"__builtins__": {}}, MatcherAdapter(matcher)
) # type: bool
ret: bool = eval(self.code, {"__builtins__": {}}, MatcherAdapter(matcher))
return ret

View File

@@ -127,7 +127,7 @@ class ParameterSet(
return cls.param(parameterset)
else:
# TODO: Refactor to fix this type-ignore. Currently the following
# type-checks but crashes:
# passes type-checking but crashes:
#
# @pytest.mark.parametrize(('x', 'y'), [1, 2])
# def test_foo(x, y): pass
@@ -231,7 +231,7 @@ class Mark:
assert self.name == other.name
# Remember source of ids with parametrize Marks.
param_ids_from = None # type: Optional[Mark]
param_ids_from: Optional[Mark] = None
if self.name == "parametrize":
if other._has_param_ids():
param_ids_from = other
@@ -465,8 +465,8 @@ class MarkGenerator:
applies a 'slowtest' :class:`Mark` on ``test_function``.
"""
_config = None # type: Optional[Config]
_markers = set() # type: Set[str]
_config: Optional[Config] = None
_markers: Set[str] = set()
# See TYPE_CHECKING above.
if TYPE_CHECKING:

View File

@@ -115,12 +115,10 @@ class MonkeyPatch:
setattr/item/env/syspath changes."""
def __init__(self) -> None:
self._setattr = [] # type: List[Tuple[object, str, object]]
self._setitem = (
[]
) # type: List[Tuple[MutableMapping[Any, Any], object, object]]
self._cwd = None # type: Optional[str]
self._savesyspath = None # type: Optional[List[str]]
self._setattr: List[Tuple[object, str, object]] = []
self._setitem: List[Tuple[MutableMapping[Any, Any], object, object]] = ([])
self._cwd: Optional[str] = None
self._savesyspath: Optional[List[str]] = None
@contextmanager
def context(self) -> Generator["MonkeyPatch", None, None]:
@@ -292,7 +290,7 @@ class MonkeyPatch:
Raises ``KeyError`` if it does not exist, unless ``raising`` is set to
False.
"""
environ = os.environ # type: MutableMapping[str, str]
environ: MutableMapping[str, str] = os.environ
self.delitem(environ, name, raising=raising)
def syspath_prepend(self, path) -> None:

View File

@@ -140,7 +140,7 @@ class Node(metaclass=NodeMeta):
#: The pytest config object.
if config:
self.config = config # type: Config
self.config: Config = config
else:
if not parent:
raise TypeError("config or parent must be provided")
@@ -161,10 +161,10 @@ class Node(metaclass=NodeMeta):
self.keywords = NodeKeywords(self)
#: The marker objects belonging to this node.
self.own_markers = [] # type: List[Mark]
self.own_markers: List[Mark] = []
#: Allow adding of extra keywords to use for matching.
self.extra_keyword_matches = set() # type: Set[str]
self.extra_keyword_matches: Set[str] = set()
if nodeid is not None:
assert "::()" not in nodeid
@@ -256,7 +256,7 @@ class Node(metaclass=NodeMeta):
"""Return list of all parent collectors up to self, starting from
the root of collection tree."""
chain = []
item = self # type: Optional[Node]
item: Optional[Node] = self
while item is not None:
chain.append(item)
item = item.parent
@@ -326,7 +326,7 @@ class Node(metaclass=NodeMeta):
def listextrakeywords(self) -> Set[str]:
"""Return a set of all extra keywords in self and any parents."""
extra_keywords = set() # type: Set[str]
extra_keywords: Set[str] = set()
for item in self.listchain():
extra_keywords.update(item.extra_keyword_matches)
return extra_keywords
@@ -345,7 +345,7 @@ class Node(metaclass=NodeMeta):
def getparent(self, cls: Type[_NodeType]) -> Optional[_NodeType]:
"""Get the next parent node (including self) which is an instance of
the given class."""
current = self # type: Optional[Node]
current: Optional[Node] = self
while current and not isinstance(current, cls):
current = current.parent
assert current is None or isinstance(current, cls)
@@ -433,9 +433,7 @@ def get_fslocation_from_item(
:rtype: A tuple of (str|py.path.local, int) with filename and line number.
"""
# See Item.location.
location = getattr(
node, "location", None
) # type: Optional[Tuple[str, Optional[int], str]]
location: Optional[Tuple[str, Optional[int], str]] = getattr(node, "location", None)
if location is not None:
return location[:2]
obj = getattr(node, "obj", None)
@@ -560,11 +558,11 @@ class Item(Node):
nodeid: Optional[str] = None,
) -> None:
super().__init__(name, parent, config, session, nodeid=nodeid)
self._report_sections = [] # type: List[Tuple[str, str, str]]
self._report_sections: List[Tuple[str, str, str]] = []
#: A list of tuples (name, value) that holds user defined properties
#: for this test.
self.user_properties = [] # type: List[Tuple[str, object]]
self.user_properties: List[Tuple[str, object]] = []
def runtest(self) -> None:
raise NotImplementedError("runtest must be implemented by Item subclass")

View File

@@ -88,8 +88,8 @@ _ET = TypeVar("_ET", bound=Type[BaseException])
class _WithException(Protocol[_F, _ET]):
Exception = None # type: _ET
__call__ = None # type: _F
Exception: _ET
__call__: _F
def _with_exception(exception_type: _ET) -> Callable[[_F], _WithException[_F, _ET]]:

View File

@@ -79,9 +79,9 @@ def create_new_paste(contents: Union[str, bytes]) -> str:
params = {"code": contents, "lexer": "text", "expiry": "1week"}
url = "https://bpaste.net"
try:
response = (
response: str = (
urlopen(url, data=urlencode(params).encode("ascii")).read().decode("utf-8")
) # type: str
)
except OSError as exc_info: # urllib errors
return "bad response: %s" % exc_info
m = re.search(r'href="/raw/(\w+)"', response)

View File

@@ -214,8 +214,8 @@ class HookRecorder:
def __init__(self, pluginmanager: PytestPluginManager) -> None:
self._pluginmanager = pluginmanager
self.calls = [] # type: List[ParsedCall]
self.ret = None # type: Optional[Union[int, ExitCode]]
self.calls: List[ParsedCall] = []
self.ret: Optional[Union[int, ExitCode]] = None
def before(hook_name: str, hook_impls, kwargs) -> None:
self.calls.append(ParsedCall(hook_name, kwargs))
@@ -474,7 +474,7 @@ class RunResult:
duration: float,
) -> None:
try:
self.ret = pytest.ExitCode(ret) # type: Union[int, ExitCode]
self.ret: Union[int, ExitCode] = pytest.ExitCode(ret)
"""The return value."""
except ValueError:
self.ret = ret
@@ -626,17 +626,17 @@ class Testdir:
def __init__(self, request: FixtureRequest, tmpdir_factory: TempdirFactory) -> None:
self.request = request
self._mod_collections = (
WeakKeyDictionary()
) # type: WeakKeyDictionary[Module, List[Union[Item, Collector]]]
self._mod_collections: WeakKeyDictionary[
Module, List[Union[Item, Collector]]
] = (WeakKeyDictionary())
if request.function:
name = request.function.__name__ # type: str
name: str = request.function.__name__
else:
name = request.node.name
self._name = name
self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True)
self.plugins = [] # type: List[Union[str, _PluggyPlugin]]
self.plugins: List[Union[str, _PluggyPlugin]] = []
self._cwd_snapshot = CwdSnapshot()
self._sys_path_snapshot = SysPathsSnapshot()
self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
@@ -919,7 +919,7 @@ class Testdir:
test items contained within.
"""
session = colitems[0].session
result = [] # type: List[Item]
result: List[Item] = []
for colitem in colitems:
result.extend(session.genitems(colitem))
return result
@@ -1437,7 +1437,7 @@ class LineMatcher:
def __init__(self, lines: List[str]) -> None:
self.lines = lines
self._log_output = [] # type: List[str]
self._log_output: List[str] = []
def _getlines(self, lines2: Union[str, Sequence[str], Source]) -> Sequence[str]:
if isinstance(lines2, str):

View File

@@ -197,9 +197,7 @@ def pytest_collect_file(
):
return None
ihook = parent.session.gethookproxy(path)
module = ihook.pytest_pycollect_makemodule(
path=path, parent=parent
) # type: Module
module: Module = ihook.pytest_pycollect_makemodule(path=path, parent=parent)
return module
return None
@@ -211,9 +209,9 @@ def path_matches_patterns(path: py.path.local, patterns: Iterable[str]) -> bool:
def pytest_pycollect_makemodule(path: py.path.local, parent) -> "Module":
if path.basename == "__init__.py":
pkg = Package.from_parent(parent, fspath=path) # type: Package
pkg: Package = Package.from_parent(parent, fspath=path)
return pkg
mod = Module.from_parent(parent, fspath=path) # type: Module
mod: Module = Module.from_parent(parent, fspath=path)
return mod
@@ -257,9 +255,9 @@ class PyobjMixin:
# Function and attributes that the mixin needs (for type-checking only).
if TYPE_CHECKING:
name = "" # type: str
parent = None # type: Optional[nodes.Node]
own_markers = [] # type: List[Mark]
name: str = ""
parent: Optional[nodes.Node] = None
own_markers: List[Mark] = []
def getparent(self, cls: Type[nodes._NodeType]) -> Optional[nodes._NodeType]:
...
@@ -336,7 +334,7 @@ class PyobjMixin:
file_path = sys.modules[obj.__module__].__file__
if file_path.endswith(".pyc"):
file_path = file_path[:-1]
fspath = file_path # type: Union[py.path.local, str]
fspath: Union[py.path.local, str] = file_path
lineno = compat_co_firstlineno
else:
fspath, lineno = getfslineno(obj)
@@ -420,8 +418,8 @@ class PyCollector(PyobjMixin, nodes.Collector):
dicts = [getattr(self.obj, "__dict__", {})]
for basecls in self.obj.__class__.__mro__:
dicts.append(basecls.__dict__)
seen = set() # type: Set[str]
values = [] # type: List[Union[nodes.Item, nodes.Collector]]
seen: Set[str] = set()
values: List[Union[nodes.Item, nodes.Collector]] = []
ihook = self.ihook
for dic in dicts:
# Note: seems like the dict can change during iteration -
@@ -696,7 +694,7 @@ class Package(Module):
init_module, self.config.getini("python_files")
):
yield Module.from_parent(self, fspath=init_module)
pkg_prefixes = set() # type: Set[py.path.local]
pkg_prefixes: Set[py.path.local] = set()
for direntry in visit(str(this_path), recurse=self._recurse):
path = py.path.local(direntry.path)
@@ -851,14 +849,14 @@ class Instance(PyCollector):
def hasinit(obj: object) -> bool:
init = getattr(obj, "__init__", None) # type: object
init: object = getattr(obj, "__init__", None)
if init:
return init != object.__init__
return False
def hasnew(obj: object) -> bool:
new = getattr(obj, "__new__", None) # type: object
new: object = getattr(obj, "__new__", None)
if new:
return new != object.__new__
return False
@@ -868,13 +866,13 @@ def hasnew(obj: object) -> bool:
class CallSpec2:
def __init__(self, metafunc: "Metafunc") -> None:
self.metafunc = metafunc
self.funcargs = {} # type: Dict[str, object]
self._idlist = [] # type: List[str]
self.params = {} # type: Dict[str, object]
self.funcargs: Dict[str, object] = {}
self._idlist: List[str] = []
self.params: Dict[str, object] = {}
# Used for sorting parametrized resources.
self._arg2scopenum = {} # type: Dict[str, int]
self.marks = [] # type: List[Mark]
self.indices = {} # type: Dict[str, int]
self._arg2scopenum: Dict[str, int] = {}
self.marks: List[Mark] = []
self.indices: Dict[str, int] = {}
def copy(self) -> "CallSpec2":
cs = CallSpec2(self.metafunc)
@@ -959,7 +957,7 @@ class Metafunc:
#: Class object where the test function is defined in or ``None``.
self.cls = cls
self._calls = [] # type: List[CallSpec2]
self._calls: List[CallSpec2] = []
self._arg2fixturedefs = fixtureinfo.name2fixturedefs
def parametrize(
@@ -1175,9 +1173,9 @@ class Metafunc:
* "funcargs" if the argname should be a parameter to the parametrized test function.
"""
if isinstance(indirect, bool):
valtypes = dict.fromkeys(
valtypes: Dict[str, Literal["params", "funcargs"]] = dict.fromkeys(
argnames, "params" if indirect else "funcargs"
) # type: Dict[str, Literal["params", "funcargs"]]
)
elif isinstance(indirect, Sequence):
valtypes = dict.fromkeys(argnames, "funcargs")
for arg in indirect:
@@ -1296,9 +1294,9 @@ def _idval(
msg = prefix + msg.format(argname, idx)
raise ValueError(msg) from e
elif config:
hook_id = config.hook.pytest_make_parametrize_id(
hook_id: Optional[str] = config.hook.pytest_make_parametrize_id(
config=config, val=val, argname=argname
) # type: Optional[str]
)
if hook_id:
return hook_id
@@ -1315,7 +1313,7 @@ def _idval(
return str(val)
elif isinstance(getattr(val, "__name__", None), str):
# Name of a class, function, module, etc.
name = getattr(val, "__name__") # type: str
name: str = getattr(val, "__name__")
return name
return str(argname) + str(idx)
@@ -1365,7 +1363,7 @@ def idmaker(
test_id_counts = Counter(resolved_ids)
# Map the test ID to its next suffix.
test_id_suffixes = defaultdict(int) # type: Dict[str, int]
test_id_suffixes: Dict[str, int] = defaultdict(int)
# Suffix non-unique IDs to make them unique.
for index, test_id in enumerate(resolved_ids):
@@ -1412,7 +1410,7 @@ def _show_fixtures_per_test(config: Config, session: Session) -> None:
def write_item(item: nodes.Item) -> None:
# Not all items have _fixtureinfo attribute.
info = getattr(item, "_fixtureinfo", None) # type: Optional[FuncFixtureInfo]
info: Optional[FuncFixtureInfo] = getattr(item, "_fixtureinfo", None)
if info is None or not info.name2fixturedefs:
# This test item does not use any fixtures.
return
@@ -1449,7 +1447,7 @@ def _showfixtures_main(config: Config, session: Session) -> None:
fm = session._fixturemanager
available = []
seen = set() # type: Set[Tuple[str, str]]
seen: Set[Tuple[str, str]] = set()
for argname, fixturedefs in fm._arg2fixturedefs.items():
assert fixturedefs is not None
@@ -1590,7 +1588,7 @@ class Function(PyobjMixin, nodes.Item):
fixtureinfo = self.session._fixturemanager.getfixtureinfo(
self, self.obj, self.cls, funcargs=True
)
self._fixtureinfo = fixtureinfo # type: FuncFixtureInfo
self._fixtureinfo: FuncFixtureInfo = fixtureinfo
self.fixturenames = fixtureinfo.names_closure
self._initrequest()
@@ -1600,7 +1598,7 @@ class Function(PyobjMixin, nodes.Item):
return super().from_parent(parent=parent, **kw)
def _initrequest(self) -> None:
self.funcargs = {} # type: Dict[str, object]
self.funcargs: Dict[str, object] = {}
self._request = fixtures.FixtureRequest(self)
@property

View File

@@ -196,8 +196,8 @@ class ApproxScalar(ApproxBase):
# Using Real should be better than this Union, but not possible yet:
# https://github.com/python/typeshed/pull/3108
DEFAULT_ABSOLUTE_TOLERANCE = 1e-12 # type: Union[float, Decimal]
DEFAULT_RELATIVE_TOLERANCE = 1e-6 # type: Union[float, Decimal]
DEFAULT_ABSOLUTE_TOLERANCE: Union[float, Decimal] = 1e-12
DEFAULT_RELATIVE_TOLERANCE: Union[float, Decimal] = 1e-6
def __repr__(self) -> str:
"""Return a string communicating both the expected value and the
@@ -266,7 +266,7 @@ class ApproxScalar(ApproxBase):
return False
# Return true if the two numbers are within the tolerance.
result = abs(self.expected - actual) <= self.tolerance # type: bool
result: bool = abs(self.expected - actual) <= self.tolerance
return result
# Ignore type because of https://github.com/python/mypy/issues/4266.
@@ -517,7 +517,7 @@ def approx(expected, rel=None, abs=None, nan_ok: bool = False) -> ApproxBase:
__tracebackhide__ = True
if isinstance(expected, Decimal):
cls = ApproxDecimal # type: Type[ApproxBase]
cls: Type[ApproxBase] = ApproxDecimal
elif isinstance(expected, Mapping):
cls = ApproxMapping
elif _is_numpy_array(expected):
@@ -542,7 +542,7 @@ def _is_numpy_array(obj: object) -> bool:
"""
import sys
np = sys.modules.get("numpy") # type: Any
np: Any = sys.modules.get("numpy")
if np is not None:
return isinstance(obj, np.ndarray)
return False
@@ -687,7 +687,7 @@ def raises(
__tracebackhide__ = True
if isinstance(expected_exception, type):
excepted_exceptions = (expected_exception,) # type: Tuple[Type[_E], ...]
excepted_exceptions: Tuple[Type[_E], ...] = (expected_exception,)
else:
excepted_exceptions = expected_exception
for exc in excepted_exceptions:
@@ -699,7 +699,7 @@ def raises(
message = f"DID NOT RAISE {expected_exception}"
if not args:
match = kwargs.pop("match", None) # type: Optional[Union[str, Pattern[str]]]
match: Optional[Union[str, Pattern[str]]] = kwargs.pop("match", None)
if kwargs:
msg = "Unexpected keyword arguments passed to pytest.raises: "
msg += ", ".join(sorted(kwargs))
@@ -738,7 +738,7 @@ class RaisesContext(Generic[_E]):
self.expected_exception = expected_exception
self.message = message
self.match_expr = match_expr
self.excinfo = None # type: Optional[_pytest._code.ExceptionInfo[_E]]
self.excinfo: Optional[_pytest._code.ExceptionInfo[_E]] = None
def __enter__(self) -> _pytest._code.ExceptionInfo[_E]:
self.excinfo = _pytest._code.ExceptionInfo.for_later()

View File

@@ -163,7 +163,7 @@ class WarningsRecorder(warnings.catch_warnings):
# Type ignored due to the way typeshed handles warnings.catch_warnings.
super().__init__(record=True) # type: ignore[call-arg]
self._entered = False
self._list = [] # type: List[warnings.WarningMessage]
self._list: List[warnings.WarningMessage] = []
@property
def list(self) -> List["warnings.WarningMessage"]:

View File

@@ -58,13 +58,13 @@ _R = TypeVar("_R", bound="BaseReport")
class BaseReport:
when = None # type: Optional[str]
location = None # type: Optional[Tuple[str, Optional[int], str]]
longrepr = (
None
) # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr]
sections = [] # type: List[Tuple[str, str]]
nodeid = None # type: str
when: Optional[str]
location: Optional[Tuple[str, Optional[int], str]]
longrepr: Union[
None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr
]
sections: List[Tuple[str, str]]
nodeid: str
def __init__(self, **kw: Any) -> None:
self.__dict__.update(kw)
@@ -254,7 +254,7 @@ class TestReport(BaseReport):
#: A (filesystempath, lineno, domaininfo) tuple indicating the
#: actual location of a test item - it might be different from the
#: collected one e.g. if a method is inherited from a different module.
self.location = location # type: Tuple[str, Optional[int], str]
self.location: Tuple[str, Optional[int], str] = location
#: A name -> value dictionary containing all keywords and
#: markers associated with a test invocation.
@@ -300,10 +300,14 @@ class TestReport(BaseReport):
excinfo = call.excinfo
sections = []
if not call.excinfo:
outcome = "passed" # type: Literal["passed", "failed", "skipped"]
longrepr = (
None
) # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr]
outcome: Literal["passed", "failed", "skipped"] = "passed"
longrepr: Union[
None,
ExceptionInfo[BaseException],
Tuple[str, int, str],
str,
TerminalRepr,
] = (None)
else:
if not isinstance(excinfo, ExceptionInfo):
outcome = "failed"
@@ -450,11 +454,11 @@ def _report_to_json(report: BaseReport) -> Dict[str, Any]:
assert rep.longrepr is not None
# TODO: Investigate whether the duck typing is really necessary here.
longrepr = cast(ExceptionRepr, rep.longrepr)
result = {
result: Dict[str, Any] = {
"reprcrash": serialize_repr_crash(longrepr.reprcrash),
"reprtraceback": serialize_repr_traceback(longrepr.reprtraceback),
"sections": longrepr.sections,
} # type: Dict[str, Any]
}
if isinstance(longrepr, ExceptionChainRepr):
result["chain"] = []
for repr_traceback, repr_crash, description in longrepr.chain:
@@ -508,13 +512,13 @@ def _report_kwargs_from_json(reportdict: Dict[str, Any]) -> Dict[str, Any]:
if data["reprlocals"]:
reprlocals = ReprLocals(data["reprlocals"]["lines"])
reprentry = ReprEntry(
reprentry: Union[ReprEntry, ReprEntryNative] = ReprEntry(
lines=data["lines"],
reprfuncargs=reprfuncargs,
reprlocals=reprlocals,
reprfileloc=reprfileloc,
style=data["style"],
) # type: Union[ReprEntry, ReprEntryNative]
)
elif entry_type == "ReprEntryNative":
reprentry = ReprEntryNative(data["lines"])
else:
@@ -555,9 +559,9 @@ def _report_kwargs_from_json(reportdict: Dict[str, Any]) -> Dict[str, Any]:
description,
)
)
exception_info = ExceptionChainRepr(
chain
) # type: Union[ExceptionChainRepr,ReprExceptionInfo]
exception_info: Union[
ExceptionChainRepr, ReprExceptionInfo
] = ExceptionChainRepr(chain)
else:
exception_info = ReprExceptionInfo(reprtraceback, reprcrash)

View File

@@ -215,7 +215,7 @@ def call_and_report(
) -> TestReport:
call = call_runtest_hook(item, when, **kwds)
hook = item.ihook
report = hook.pytest_runtest_makereport(item=item, call=call) # type: TestReport
report: TestReport = hook.pytest_runtest_makereport(item=item, call=call)
if log:
hook.pytest_runtest_logreport(report=report)
if check_interactive_exception(call, report):
@@ -242,14 +242,14 @@ def call_runtest_hook(
item: Item, when: "Literal['setup', 'call', 'teardown']", **kwds
) -> "CallInfo[None]":
if when == "setup":
ihook = item.ihook.pytest_runtest_setup # type: Callable[..., None]
ihook: Callable[..., None] = item.ihook.pytest_runtest_setup
elif when == "call":
ihook = item.ihook.pytest_runtest_call
elif when == "teardown":
ihook = item.ihook.pytest_runtest_teardown
else:
assert False, f"Unhandled runtest hook case: {when}"
reraise = (Exit,) # type: Tuple[Type[BaseException], ...]
reraise: Tuple[Type[BaseException], ...] = (Exit,)
if not item.config.getoption("usepdb", False):
reraise += (KeyboardInterrupt,)
return CallInfo.from_call(
@@ -309,7 +309,7 @@ class CallInfo(Generic[TResult]):
start = timing.time()
precise_start = timing.perf_counter()
try:
result = func() # type: Optional[TResult]
result: Optional[TResult] = func()
except BaseException:
excinfo = ExceptionInfo.from_current()
if reraise is not None and isinstance(excinfo.value, reraise):
@@ -340,9 +340,9 @@ def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> TestReport:
def pytest_make_collect_report(collector: Collector) -> CollectReport:
call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
longrepr = None # type: Union[None, Tuple[str, int, str], str, TerminalRepr]
longrepr: Union[None, Tuple[str, int, str], str, TerminalRepr] = None
if not call.excinfo:
outcome = "passed" # type: Literal["passed", "skipped", "failed"]
outcome: Literal["passed", "skipped", "failed"] = "passed"
else:
skip_exceptions = [Skipped]
unittest = sys.modules.get("unittest")
@@ -373,8 +373,8 @@ class SetupState:
"""Shared state for setting up/tearing down test items or collectors."""
def __init__(self):
self.stack = [] # type: List[Node]
self._finalizers = {} # type: Dict[Node, List[Callable[[], object]]]
self.stack: List[Node] = []
self._finalizers: Dict[Node, List[Callable[[], object]]] = {}
def addfinalizer(self, finalizer: Callable[[], object], colitem) -> None:
"""Attach a finalizer to the given colitem."""
@@ -456,7 +456,7 @@ class SetupState:
def collect_one_node(collector: Collector) -> CollectReport:
ihook = collector.ihook
ihook.pytest_collectstart(collector=collector)
rep = ihook.pytest_make_collect_report(collector=collector) # type: CollectReport
rep: CollectReport = ihook.pytest_make_collect_report(collector=collector)
call = rep.__dict__.pop("call", None)
if call and check_interactive_exception(call, rep):
ihook.pytest_exception_interact(node=collector, call=call, report=rep)

View File

@@ -35,7 +35,7 @@ class StepwisePlugin:
def __init__(self, config: Config) -> None:
self.config = config
self.active = config.getvalue("stepwise")
self.session = None # type: Optional[Session]
self.session: Optional[Session] = None
self.report_status = ""
if self.active:

View File

@@ -83,7 +83,7 @@ class Store:
__slots__ = ("_store",)
def __init__(self) -> None:
self._store = {} # type: Dict[StoreKey[Any], object]
self._store: Dict[StoreKey[Any], object] = {}
def __setitem__(self, key: StoreKey[T], value: T) -> None:
"""Set a value for key."""

View File

@@ -235,7 +235,7 @@ def pytest_configure(config: Config) -> None:
def getreportopt(config: Config) -> str:
reportchars = config.option.reportchars # type: str
reportchars: str = config.option.reportchars
old_aliases = {"F", "S"}
reportopts = ""
@@ -267,7 +267,7 @@ def pytest_report_teststatus(report: BaseReport) -> Tuple[str, str, str]:
elif report.skipped:
letter = "s"
outcome = report.outcome # type: str
outcome: str = report.outcome
if report.when in ("collect", "setup", "teardown") and outcome == "failed":
outcome = "error"
letter = "E"
@@ -317,27 +317,27 @@ class TerminalReporter:
self.config = config
self._numcollected = 0
self._session = None # type: Optional[Session]
self._showfspath = None # type: Optional[bool]
self._session: Optional[Session] = None
self._showfspath: Optional[bool] = None
self.stats = {} # type: Dict[str, List[Any]]
self._main_color = None # type: Optional[str]
self._known_types = None # type: Optional[List[str]]
self.stats: Dict[str, List[Any]] = {}
self._main_color: Optional[str] = None
self._known_types: Optional[List[str]] = None
self.startdir = config.invocation_dir
self.startpath = config.invocation_params.dir
if file is None:
file = sys.stdout
self._tw = _pytest.config.create_terminal_writer(config, file)
self._screen_width = self._tw.fullwidth
self.currentfspath = None # type: Union[None, Path, str, int]
self.currentfspath: Union[None, Path, str, int] = None
self.reportchars = getreportopt(config)
self.hasmarkup = self._tw.hasmarkup
self.isatty = file.isatty()
self._progress_nodeids_reported = set() # type: Set[str]
self._progress_nodeids_reported: Set[str] = set()
self._show_progress_info = self._determine_show_progress_info()
self._collect_report_last_write = None # type: Optional[float]
self._already_displayed_warnings = None # type: Optional[int]
self._keyboardinterrupt_memo = None # type: Optional[ExceptionRepr]
self._collect_report_last_write: Optional[float] = None
self._already_displayed_warnings: Optional[int] = None
self._keyboardinterrupt_memo: Optional[ExceptionRepr] = None
def _determine_show_progress_info(self) -> "Literal['progress', 'count', False]":
"""Return whether we should display progress information based on the current config."""
@@ -347,7 +347,7 @@ class TerminalReporter:
# do not show progress if we are showing fixture setup/teardown
if self.config.getoption("setupshow", False):
return False
cfg = self.config.getini("console_output_style") # type: str
cfg: str = self.config.getini("console_output_style")
if cfg == "progress":
return "progress"
elif cfg == "count":
@@ -357,7 +357,7 @@ class TerminalReporter:
@property
def verbosity(self) -> int:
verbosity = self.config.option.verbose # type: int
verbosity: int = self.config.option.verbose
return verbosity
@property
@@ -512,9 +512,9 @@ class TerminalReporter:
def pytest_runtest_logreport(self, report: TestReport) -> None:
self._tests_ran = True
rep = report
res = self.config.hook.pytest_report_teststatus(
report=rep, config=self.config
) # type: Tuple[str, str, Union[str, Tuple[str, Mapping[str, bool]]]]
res: Tuple[
str, str, Union[str, Tuple[str, Mapping[str, bool]]]
] = self.config.hook.pytest_report_teststatus(report=rep, config=self.config)
category, letter, word = res
if not isinstance(word, tuple):
markup = None
@@ -718,7 +718,7 @@ class TerminalReporter:
if config.inipath:
line += ", configfile: " + bestrelpath(config.rootpath, config.inipath)
testpaths = config.getini("testpaths") # type: List[str]
testpaths: List[str] = config.getini("testpaths")
if config.invocation_params.dir == config.rootpath and config.args == testpaths:
line += ", testpaths: {}".format(", ".join(testpaths))
@@ -755,7 +755,7 @@ class TerminalReporter:
# because later versions are going to get rid of them anyway.
if self.config.option.verbose < 0:
if self.config.option.verbose < -1:
counts = {} # type: Dict[str, int]
counts: Dict[str, int] = {}
for item in items:
name = item.nodeid.split("::", 1)[0]
counts[name] = counts.get(name, 0) + 1
@@ -765,7 +765,7 @@ class TerminalReporter:
for item in items:
self._tw.line(item.nodeid)
return
stack = [] # type: List[Node]
stack: List[Node] = []
indent = ""
for item in items:
needed_collectors = item.listchain()[1:] # strip root node
@@ -896,9 +896,7 @@ class TerminalReporter:
def summary_warnings(self) -> None:
if self.hasopt("w"):
all_warnings = self.stats.get(
"warnings"
) # type: Optional[List[WarningReport]]
all_warnings: Optional[List[WarningReport]] = self.stats.get("warnings")
if not all_warnings:
return
@@ -911,9 +909,9 @@ class TerminalReporter:
if not warning_reports:
return
reports_grouped_by_message = (
reports_grouped_by_message: Dict[str, List[WarningReport]] = (
order_preserving_dict()
) # type: Dict[str, List[WarningReport]]
)
for wr in warning_reports:
reports_grouped_by_message.setdefault(wr.message, []).append(wr)
@@ -927,7 +925,7 @@ class TerminalReporter:
if len(locations) < 10:
return "\n".join(map(str, locations))
counts_by_filename = order_preserving_dict() # type: Dict[str, int]
counts_by_filename: Dict[str, int] = order_preserving_dict()
for loc in locations:
key = str(loc).split("::", 1)[0]
counts_by_filename[key] = counts_by_filename.get(key, 0) + 1
@@ -954,7 +952,7 @@ class TerminalReporter:
def summary_passes(self) -> None:
if self.config.option.tbstyle != "no":
if self.hasopt("P"):
reports = self.getreports("passed") # type: List[TestReport]
reports: List[TestReport] = self.getreports("passed")
if not reports:
return
self.write_sep("=", "PASSES")
@@ -992,7 +990,7 @@ class TerminalReporter:
def summary_failures(self) -> None:
if self.config.option.tbstyle != "no":
reports = self.getreports("failed") # type: List[BaseReport]
reports: List[BaseReport] = self.getreports("failed")
if not reports:
return
self.write_sep("=", "FAILURES")
@@ -1009,7 +1007,7 @@ class TerminalReporter:
def summary_errors(self) -> None:
if self.config.option.tbstyle != "no":
reports = self.getreports("error") # type: List[BaseReport]
reports: List[BaseReport] = self.getreports("error")
if not reports:
return
self.write_sep("=", "ERRORS")
@@ -1105,7 +1103,7 @@ class TerminalReporter:
lines.append(f"{verbose_word} {pos} {reason}")
def show_skipped(lines: List[str]) -> None:
skipped = self.stats.get("skipped", []) # type: List[CollectReport]
skipped: List[CollectReport] = self.stats.get("skipped", [])
fskips = _folded_skips(self.startpath, skipped) if skipped else []
if not fskips:
return
@@ -1121,16 +1119,16 @@ class TerminalReporter:
else:
lines.append("%s [%d] %s: %s" % (verbose_word, num, fspath, reason))
REPORTCHAR_ACTIONS = {
REPORTCHAR_ACTIONS: Mapping[str, Callable[[List[str]], None]] = {
"x": show_xfailed,
"X": show_xpassed,
"f": partial(show_simple, "failed"),
"s": show_skipped,
"p": partial(show_simple, "passed"),
"E": partial(show_simple, "error"),
} # type: Mapping[str, Callable[[List[str]], None]]
}
lines = [] # type: List[str]
lines: List[str] = []
for char in self.reportchars:
action = REPORTCHAR_ACTIONS.get(char)
if action: # skipping e.g. "P" (passed with output) here.
@@ -1161,7 +1159,7 @@ class TerminalReporter:
return main_color
def _set_main_color(self) -> None:
unknown_types = [] # type: List[str]
unknown_types: List[str] = []
for found_type in self.stats.keys():
if found_type: # setup/teardown reports have an empty key, ignore them
if found_type not in KNOWN_TYPES and found_type not in unknown_types:
@@ -1236,7 +1234,7 @@ def _get_line_with_reprcrash_message(
def _folded_skips(
startpath: Path, skipped: Sequence[CollectReport],
) -> List[Tuple[int, str, Optional[int], str]]:
d = {} # type: Dict[Tuple[str, Optional[int], str], List[CollectReport]]
d: Dict[Tuple[str, Optional[int], str], List[CollectReport]] = {}
for event in skipped:
assert event.longrepr is not None
assert isinstance(event.longrepr, tuple), (event, event.longrepr)
@@ -1253,11 +1251,11 @@ def _folded_skips(
and "skip" in keywords
and "pytestmark" not in keywords
):
key = (fspath, None, reason) # type: Tuple[str, Optional[int], str]
key: Tuple[str, Optional[int], str] = (fspath, None, reason)
else:
key = (fspath, lineno, reason)
d.setdefault(key, []).append(event)
values = [] # type: List[Tuple[int, str, Optional[int], str]]
values: List[Tuple[int, str, Optional[int], str]] = []
for key, events in d.items():
values.append((len(events), *key))
return values
@@ -1286,7 +1284,7 @@ def _make_plural(count: int, noun: str) -> Tuple[int, str]:
def _plugin_nameversions(plugininfo) -> List[str]:
values = [] # type: List[str]
values: List[str] = []
for plugin, dist in plugininfo:
# Gets us name and version!
name = "{dist.project_name}-{dist.version}".format(dist=dist)

View File

@@ -55,7 +55,7 @@ def pytest_pycollect_makeitem(
except Exception:
return None
# Yes, so let's collect it.
item = UnitTestCase.from_parent(collector, name=name, obj=obj) # type: UnitTestCase
item: UnitTestCase = UnitTestCase.from_parent(collector, name=name, obj=obj)
return item
@@ -141,12 +141,12 @@ def _make_xunit_fixture(
class TestCaseFunction(Function):
nofuncargs = True
_excinfo = None # type: Optional[List[_pytest._code.ExceptionInfo[BaseException]]]
_testcase = None # type: Optional[unittest.TestCase]
_excinfo: Optional[List[_pytest._code.ExceptionInfo[BaseException]]] = None
_testcase: Optional["unittest.TestCase"] = None
def setup(self) -> None:
# A bound method to be called during teardown() if set (see 'runtest()').
self._explicit_tearDown = None # type: Optional[Callable[[], None]]
self._explicit_tearDown: Optional[Callable[[], None]] = None
assert self.parent is not None
self._testcase = self.parent.obj(self.name) # type: ignore[attr-defined]
self._obj = getattr(self._testcase, self.name)
@@ -320,7 +320,7 @@ def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> None:
@hookimpl(hookwrapper=True)
def pytest_runtest_protocol(item: Item) -> Generator[None, None, None]:
if isinstance(item, TestCaseFunction) and "twisted.trial.unittest" in sys.modules:
ut = sys.modules["twisted.python.failure"] # type: Any
ut: Any = sys.modules["twisted.python.failure"]
Failure__init__ = ut.Failure.__init__
check_testcase_implements_trial_reporter()