skipping: refactor skipif/xfail mark evaluation
Previously, skipif/xfail marks were evaluated using a `MarkEvaluator` class. I found this class very difficult to understand. Instead of `MarkEvaluator`, rewrite using straight functions which are hopefully easier to follow. I tried to keep the semantics exactly as before, except improving a few error messages.
This commit is contained in:
@@ -3,12 +3,13 @@ import os
|
||||
import platform
|
||||
import sys
|
||||
import traceback
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import attr
|
||||
|
||||
import _pytest._code
|
||||
from _pytest.compat import TYPE_CHECKING
|
||||
from _pytest.config import Config
|
||||
from _pytest.config import hookimpl
|
||||
from _pytest.config.argparsing import Parser
|
||||
@@ -16,12 +17,14 @@ from _pytest.mark.structures import Mark
|
||||
from _pytest.nodes import Item
|
||||
from _pytest.outcomes import fail
|
||||
from _pytest.outcomes import skip
|
||||
from _pytest.outcomes import TEST_OUTCOME
|
||||
from _pytest.outcomes import xfail
|
||||
from _pytest.reports import BaseReport
|
||||
from _pytest.runner import CallInfo
|
||||
from _pytest.store import StoreKey
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Type
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser) -> None:
|
||||
group = parser.getgroup("general")
|
||||
@@ -64,17 +67,16 @@ def pytest_configure(config: Config) -> None:
|
||||
)
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"skipif(condition): skip the given test function if eval(condition) "
|
||||
"results in a True value. Evaluation happens within the "
|
||||
"module global context. Example: skipif('sys.platform == \"win32\"') "
|
||||
"skips the test if we are on the win32 platform. see "
|
||||
"https://docs.pytest.org/en/latest/skipping.html",
|
||||
"skipif(condition, ..., *, reason=...): "
|
||||
"skip the given test function if any of the conditions evaluate to True. "
|
||||
"Example: skipif(sys.platform == 'win32') skips the test if we are on the win32 platform. "
|
||||
"see https://docs.pytest.org/en/latest/skipping.html",
|
||||
)
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"xfail(condition, reason=None, run=True, raises=None, strict=False): "
|
||||
"mark the test function as an expected failure if eval(condition) "
|
||||
"has a True value. Optionally specify a reason for better reporting "
|
||||
"xfail(condition, ..., *, reason=..., run=True, raises=None, strict=xfail_strict): "
|
||||
"mark the test function as an expected failure if any of the conditions "
|
||||
"evaluate to True. Optionally specify a reason for better reporting "
|
||||
"and run=False if you don't even want to execute the test function. "
|
||||
"If only specific exception(s) are expected, you can list them in "
|
||||
"raises, and if the test fails in other ways, it will be reported as "
|
||||
@@ -82,179 +84,191 @@ def pytest_configure(config: Config) -> None:
|
||||
)
|
||||
|
||||
|
||||
def compiled_eval(expr: str, d: Dict[str, object]) -> Any:
|
||||
import _pytest._code
|
||||
def evaluate_condition(item: Item, mark: Mark, condition: object) -> Tuple[bool, str]:
|
||||
"""Evaluate a single skipif/xfail condition.
|
||||
|
||||
exprcode = _pytest._code.compile(expr, mode="eval")
|
||||
return eval(exprcode, d)
|
||||
If an old-style string condition is given, it is eval()'d, otherwise the
|
||||
condition is bool()'d. If this fails, an appropriately formatted pytest.fail
|
||||
is raised.
|
||||
|
||||
|
||||
class MarkEvaluator:
|
||||
def __init__(self, item: Item, name: str) -> None:
|
||||
self.item = item
|
||||
self._marks = None # type: Optional[List[Mark]]
|
||||
self._mark = None # type: Optional[Mark]
|
||||
self._mark_name = name
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
# don't cache here to prevent staleness
|
||||
return bool(self._get_marks())
|
||||
|
||||
def wasvalid(self) -> bool:
|
||||
return not hasattr(self, "exc")
|
||||
|
||||
def _get_marks(self) -> List[Mark]:
|
||||
return list(self.item.iter_markers(name=self._mark_name))
|
||||
|
||||
def invalidraise(self, exc) -> Optional[bool]:
|
||||
raises = self.get("raises")
|
||||
if not raises:
|
||||
return None
|
||||
return not isinstance(exc, raises)
|
||||
|
||||
def istrue(self) -> bool:
|
||||
Returns (result, reason). The reason is only relevant if the result is True.
|
||||
"""
|
||||
# String condition.
|
||||
if isinstance(condition, str):
|
||||
globals_ = {
|
||||
"os": os,
|
||||
"sys": sys,
|
||||
"platform": platform,
|
||||
"config": item.config,
|
||||
}
|
||||
if hasattr(item, "obj"):
|
||||
globals_.update(item.obj.__globals__) # type: ignore[attr-defined]
|
||||
try:
|
||||
return self._istrue()
|
||||
except TEST_OUTCOME:
|
||||
self.exc = sys.exc_info()
|
||||
if isinstance(self.exc[1], SyntaxError):
|
||||
# TODO: Investigate why SyntaxError.offset is Optional, and if it can be None here.
|
||||
assert self.exc[1].offset is not None
|
||||
msg = [" " * (self.exc[1].offset + 4) + "^"]
|
||||
msg.append("SyntaxError: invalid syntax")
|
||||
else:
|
||||
msg = traceback.format_exception_only(*self.exc[:2])
|
||||
fail(
|
||||
"Error evaluating %r expression\n"
|
||||
" %s\n"
|
||||
"%s" % (self._mark_name, self.expr, "\n".join(msg)),
|
||||
pytrace=False,
|
||||
condition_code = _pytest._code.compile(condition, mode="eval")
|
||||
result = eval(condition_code, globals_)
|
||||
except SyntaxError as exc:
|
||||
msglines = [
|
||||
"Error evaluating %r condition" % mark.name,
|
||||
" " + condition,
|
||||
" " + " " * (exc.offset or 0) + "^",
|
||||
"SyntaxError: invalid syntax",
|
||||
]
|
||||
fail("\n".join(msglines), pytrace=False)
|
||||
except Exception as exc:
|
||||
msglines = [
|
||||
"Error evaluating %r condition" % mark.name,
|
||||
" " + condition,
|
||||
*traceback.format_exception_only(type(exc), exc),
|
||||
]
|
||||
fail("\n".join(msglines), pytrace=False)
|
||||
|
||||
# Boolean condition.
|
||||
else:
|
||||
try:
|
||||
result = bool(condition)
|
||||
except Exception as exc:
|
||||
msglines = [
|
||||
"Error evaluating %r condition as a boolean" % mark.name,
|
||||
*traceback.format_exception_only(type(exc), exc),
|
||||
]
|
||||
fail("\n".join(msglines), pytrace=False)
|
||||
|
||||
reason = mark.kwargs.get("reason", None)
|
||||
if reason is None:
|
||||
if isinstance(condition, str):
|
||||
reason = "condition: " + condition
|
||||
else:
|
||||
# XXX better be checked at collection time
|
||||
msg = (
|
||||
"Error evaluating %r: " % mark.name
|
||||
+ "you need to specify reason=STRING when using booleans as conditions."
|
||||
)
|
||||
fail(msg, pytrace=False)
|
||||
|
||||
def _getglobals(self) -> Dict[str, object]:
|
||||
d = {"os": os, "sys": sys, "platform": platform, "config": self.item.config}
|
||||
if hasattr(self.item, "obj"):
|
||||
d.update(self.item.obj.__globals__) # type: ignore[attr-defined] # noqa: F821
|
||||
return d
|
||||
|
||||
def _istrue(self) -> bool:
|
||||
if hasattr(self, "result"):
|
||||
result = getattr(self, "result") # type: bool
|
||||
return result
|
||||
self._marks = self._get_marks()
|
||||
|
||||
if self._marks:
|
||||
self.result = False
|
||||
for mark in self._marks:
|
||||
self._mark = mark
|
||||
if "condition" not in mark.kwargs:
|
||||
args = mark.args
|
||||
else:
|
||||
args = (mark.kwargs["condition"],)
|
||||
|
||||
for expr in args:
|
||||
self.expr = expr
|
||||
if isinstance(expr, str):
|
||||
d = self._getglobals()
|
||||
result = compiled_eval(expr, d)
|
||||
else:
|
||||
if "reason" not in mark.kwargs:
|
||||
# XXX better be checked at collection time
|
||||
msg = (
|
||||
"you need to specify reason=STRING "
|
||||
"when using booleans as conditions."
|
||||
)
|
||||
fail(msg)
|
||||
result = bool(expr)
|
||||
if result:
|
||||
self.result = True
|
||||
self.reason = mark.kwargs.get("reason", None)
|
||||
self.expr = expr
|
||||
return self.result
|
||||
|
||||
if not args:
|
||||
self.result = True
|
||||
self.reason = mark.kwargs.get("reason", None)
|
||||
return self.result
|
||||
return False
|
||||
|
||||
def get(self, attr, default=None):
|
||||
if self._mark is None:
|
||||
return default
|
||||
return self._mark.kwargs.get(attr, default)
|
||||
|
||||
def getexplanation(self):
|
||||
expl = getattr(self, "reason", None) or self.get("reason", None)
|
||||
if not expl:
|
||||
if not hasattr(self, "expr"):
|
||||
return ""
|
||||
else:
|
||||
return "condition: " + str(self.expr)
|
||||
return expl
|
||||
return result, reason
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class Skip:
|
||||
"""The result of evaluate_skip_marks()."""
|
||||
|
||||
reason = attr.ib(type=str)
|
||||
|
||||
|
||||
def evaluate_skip_marks(item: Item) -> Optional[Skip]:
|
||||
"""Evaluate skip and skipif marks on item, returning Skip if triggered."""
|
||||
for mark in item.iter_markers(name="skipif"):
|
||||
if "condition" not in mark.kwargs:
|
||||
conditions = mark.args
|
||||
else:
|
||||
conditions = (mark.kwargs["condition"],)
|
||||
|
||||
# Unconditional.
|
||||
if not conditions:
|
||||
reason = mark.kwargs.get("reason", "")
|
||||
return Skip(reason)
|
||||
|
||||
# If any of the conditions are true.
|
||||
for condition in conditions:
|
||||
result, reason = evaluate_condition(item, mark, condition)
|
||||
if result:
|
||||
return Skip(reason)
|
||||
|
||||
for mark in item.iter_markers(name="skip"):
|
||||
if "reason" in mark.kwargs:
|
||||
reason = mark.kwargs["reason"]
|
||||
elif mark.args:
|
||||
reason = mark.args[0]
|
||||
else:
|
||||
reason = "unconditional skip"
|
||||
return Skip(reason)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class Xfail:
|
||||
"""The result of evaluate_xfail_marks()."""
|
||||
|
||||
reason = attr.ib(type=str)
|
||||
run = attr.ib(type=bool)
|
||||
strict = attr.ib(type=bool)
|
||||
raises = attr.ib(type=Optional[Tuple["Type[BaseException]", ...]])
|
||||
|
||||
|
||||
def evaluate_xfail_marks(item: Item) -> Optional[Xfail]:
|
||||
"""Evaluate xfail marks on item, returning Xfail if triggered."""
|
||||
for mark in item.iter_markers(name="xfail"):
|
||||
run = mark.kwargs.get("run", True)
|
||||
strict = mark.kwargs.get("strict", item.config.getini("xfail_strict"))
|
||||
raises = mark.kwargs.get("raises", None)
|
||||
if "condition" not in mark.kwargs:
|
||||
conditions = mark.args
|
||||
else:
|
||||
conditions = (mark.kwargs["condition"],)
|
||||
|
||||
# Unconditional.
|
||||
if not conditions:
|
||||
reason = mark.kwargs.get("reason", "")
|
||||
return Xfail(reason, run, strict, raises)
|
||||
|
||||
# If any of the conditions are true.
|
||||
for condition in conditions:
|
||||
result, reason = evaluate_condition(item, mark, condition)
|
||||
if result:
|
||||
return Xfail(reason, run, strict, raises)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# Whether skipped due to skip or skipif marks.
|
||||
skipped_by_mark_key = StoreKey[bool]()
|
||||
evalxfail_key = StoreKey[MarkEvaluator]()
|
||||
# Saves the xfail mark evaluation. Can be refreshed during call if None.
|
||||
xfailed_key = StoreKey[Optional[Xfail]]()
|
||||
unexpectedsuccess_key = StoreKey[str]()
|
||||
|
||||
|
||||
@hookimpl(tryfirst=True)
|
||||
def pytest_runtest_setup(item: Item) -> None:
|
||||
# Check if skip or skipif are specified as pytest marks
|
||||
item._store[skipped_by_mark_key] = False
|
||||
eval_skipif = MarkEvaluator(item, "skipif")
|
||||
if eval_skipif.istrue():
|
||||
item._store[skipped_by_mark_key] = True
|
||||
skip(eval_skipif.getexplanation())
|
||||
|
||||
for skip_info in item.iter_markers(name="skip"):
|
||||
skipped = evaluate_skip_marks(item)
|
||||
if skipped:
|
||||
item._store[skipped_by_mark_key] = True
|
||||
if "reason" in skip_info.kwargs:
|
||||
skip(skip_info.kwargs["reason"])
|
||||
elif skip_info.args:
|
||||
skip(skip_info.args[0])
|
||||
else:
|
||||
skip("unconditional skip")
|
||||
skip(skipped.reason)
|
||||
|
||||
item._store[evalxfail_key] = MarkEvaluator(item, "xfail")
|
||||
check_xfail_no_run(item)
|
||||
if not item.config.option.runxfail:
|
||||
item._store[xfailed_key] = xfailed = evaluate_xfail_marks(item)
|
||||
if xfailed and not xfailed.run:
|
||||
xfail("[NOTRUN] " + xfailed.reason)
|
||||
|
||||
|
||||
@hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_call(item: Item):
|
||||
check_xfail_no_run(item)
|
||||
if not item.config.option.runxfail:
|
||||
xfailed = item._store.get(xfailed_key, None)
|
||||
if xfailed is None:
|
||||
item._store[xfailed_key] = xfailed = evaluate_xfail_marks(item)
|
||||
if xfailed and not xfailed.run:
|
||||
xfail("[NOTRUN] " + xfailed.reason)
|
||||
|
||||
outcome = yield
|
||||
passed = outcome.excinfo is None
|
||||
|
||||
if passed:
|
||||
check_strict_xfail(item)
|
||||
|
||||
|
||||
def check_xfail_no_run(item: Item) -> None:
|
||||
"""check xfail(run=False)"""
|
||||
if not item.config.option.runxfail:
|
||||
evalxfail = item._store[evalxfail_key]
|
||||
if evalxfail.istrue():
|
||||
if not evalxfail.get("run", True):
|
||||
xfail("[NOTRUN] " + evalxfail.getexplanation())
|
||||
|
||||
|
||||
def check_strict_xfail(item: Item) -> None:
|
||||
"""check xfail(strict=True) for the given PASSING test"""
|
||||
evalxfail = item._store[evalxfail_key]
|
||||
if evalxfail.istrue():
|
||||
strict_default = item.config.getini("xfail_strict")
|
||||
is_strict_xfail = evalxfail.get("strict", strict_default)
|
||||
if is_strict_xfail:
|
||||
del item._store[evalxfail_key]
|
||||
explanation = evalxfail.getexplanation()
|
||||
fail("[XPASS(strict)] " + explanation, pytrace=False)
|
||||
xfailed = item._store.get(xfailed_key, None)
|
||||
if xfailed is None:
|
||||
item._store[xfailed_key] = xfailed = evaluate_xfail_marks(item)
|
||||
if xfailed and xfailed.strict:
|
||||
del item._store[xfailed_key]
|
||||
fail("[XPASS(strict)] " + xfailed.reason, pytrace=False)
|
||||
|
||||
|
||||
@hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_makereport(item: Item, call: CallInfo[None]):
|
||||
outcome = yield
|
||||
rep = outcome.get_result()
|
||||
evalxfail = item._store.get(evalxfail_key, None)
|
||||
xfailed = item._store.get(xfailed_key, None)
|
||||
# unittest special case, see setting of unexpectedsuccess_key
|
||||
if unexpectedsuccess_key in item._store and rep.when == "call":
|
||||
reason = item._store[unexpectedsuccess_key]
|
||||
@@ -263,30 +277,27 @@ def pytest_runtest_makereport(item: Item, call: CallInfo[None]):
|
||||
else:
|
||||
rep.longrepr = "Unexpected success"
|
||||
rep.outcome = "failed"
|
||||
|
||||
elif item.config.option.runxfail:
|
||||
pass # don't interfere
|
||||
elif call.excinfo and isinstance(call.excinfo.value, xfail.Exception):
|
||||
assert call.excinfo.value.msg is not None
|
||||
rep.wasxfail = "reason: " + call.excinfo.value.msg
|
||||
rep.outcome = "skipped"
|
||||
elif evalxfail and not rep.skipped and evalxfail.wasvalid() and evalxfail.istrue():
|
||||
elif not rep.skipped and xfailed:
|
||||
if call.excinfo:
|
||||
if evalxfail.invalidraise(call.excinfo.value):
|
||||
raises = xfailed.raises
|
||||
if raises is not None and not isinstance(call.excinfo.value, raises):
|
||||
rep.outcome = "failed"
|
||||
else:
|
||||
rep.outcome = "skipped"
|
||||
rep.wasxfail = evalxfail.getexplanation()
|
||||
rep.wasxfail = xfailed.reason
|
||||
elif call.when == "call":
|
||||
strict_default = item.config.getini("xfail_strict")
|
||||
is_strict_xfail = evalxfail.get("strict", strict_default)
|
||||
explanation = evalxfail.getexplanation()
|
||||
if is_strict_xfail:
|
||||
if xfailed.strict:
|
||||
rep.outcome = "failed"
|
||||
rep.longrepr = "[XPASS(strict)] {}".format(explanation)
|
||||
rep.longrepr = "[XPASS(strict)] " + xfailed.reason
|
||||
else:
|
||||
rep.outcome = "passed"
|
||||
rep.wasxfail = explanation
|
||||
rep.wasxfail = xfailed.reason
|
||||
elif (
|
||||
item._store.get(skipped_by_mark_key, True)
|
||||
and rep.skipped
|
||||
@@ -301,9 +312,6 @@ def pytest_runtest_makereport(item: Item, call: CallInfo[None]):
|
||||
rep.longrepr = str(filename), line + 1, reason
|
||||
|
||||
|
||||
# called by terminalreporter progress reporting
|
||||
|
||||
|
||||
def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str]]:
|
||||
if hasattr(report, "wasxfail"):
|
||||
if report.skipped:
|
||||
|
||||
Reference in New Issue
Block a user