500 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			500 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
from io import StringIO
 | 
						|
from pprint import pprint
 | 
						|
from typing import List
 | 
						|
from typing import Optional
 | 
						|
from typing import Tuple
 | 
						|
from typing import Union
 | 
						|
 | 
						|
import py
 | 
						|
 | 
						|
from _pytest._code.code import ExceptionChainRepr
 | 
						|
from _pytest._code.code import ExceptionInfo
 | 
						|
from _pytest._code.code import ReprEntry
 | 
						|
from _pytest._code.code import ReprEntryNative
 | 
						|
from _pytest._code.code import ReprExceptionInfo
 | 
						|
from _pytest._code.code import ReprFileLocation
 | 
						|
from _pytest._code.code import ReprFuncArgs
 | 
						|
from _pytest._code.code import ReprLocals
 | 
						|
from _pytest._code.code import ReprTraceback
 | 
						|
from _pytest._code.code import TerminalRepr
 | 
						|
from _pytest.nodes import Node
 | 
						|
from _pytest.outcomes import skip
 | 
						|
from _pytest.pathlib import Path
 | 
						|
 | 
						|
 | 
						|
def getslaveinfoline(node):
 | 
						|
    try:
 | 
						|
        return node._slaveinfocache
 | 
						|
    except AttributeError:
 | 
						|
        d = node.slaveinfo
 | 
						|
        ver = "%s.%s.%s" % d["version_info"][:3]
 | 
						|
        node._slaveinfocache = s = "[{}] {} -- Python {} {}".format(
 | 
						|
            d["id"], d["sysplatform"], ver, d["executable"]
 | 
						|
        )
 | 
						|
        return s
 | 
						|
 | 
						|
 | 
						|
class BaseReport:
 | 
						|
    when = None  # type: Optional[str]
 | 
						|
    location = None  # type: Optional[Tuple[str, Optional[int], str]]
 | 
						|
    longrepr = None
 | 
						|
    sections = []  # type: List[Tuple[str, str]]
 | 
						|
    nodeid = None  # type: str
 | 
						|
 | 
						|
    def __init__(self, **kw):
 | 
						|
        self.__dict__.update(kw)
 | 
						|
 | 
						|
    def toterminal(self, out) -> None:
 | 
						|
        if hasattr(self, "node"):
 | 
						|
            out.line(getslaveinfoline(self.node))  # type: ignore
 | 
						|
 | 
						|
        longrepr = self.longrepr
 | 
						|
        if longrepr is None:
 | 
						|
            return
 | 
						|
 | 
						|
        if hasattr(longrepr, "toterminal"):
 | 
						|
            longrepr.toterminal(out)
 | 
						|
        else:
 | 
						|
            try:
 | 
						|
                out.line(longrepr)
 | 
						|
            except UnicodeEncodeError:
 | 
						|
                out.line("<unprintable longrepr>")
 | 
						|
 | 
						|
    def get_sections(self, prefix):
 | 
						|
        for name, content in self.sections:
 | 
						|
            if name.startswith(prefix):
 | 
						|
                yield prefix, content
 | 
						|
 | 
						|
    @property
 | 
						|
    def longreprtext(self):
 | 
						|
        """
 | 
						|
        Read-only property that returns the full string representation
 | 
						|
        of ``longrepr``.
 | 
						|
 | 
						|
        .. versionadded:: 3.0
 | 
						|
        """
 | 
						|
        tw = py.io.TerminalWriter(stringio=True)
 | 
						|
        tw.hasmarkup = False
 | 
						|
        self.toterminal(tw)
 | 
						|
        exc = tw.stringio.getvalue()
 | 
						|
        return exc.strip()
 | 
						|
 | 
						|
    @property
 | 
						|
    def caplog(self):
 | 
						|
        """Return captured log lines, if log capturing is enabled
 | 
						|
 | 
						|
        .. versionadded:: 3.5
 | 
						|
        """
 | 
						|
        return "\n".join(
 | 
						|
            content for (prefix, content) in self.get_sections("Captured log")
 | 
						|
        )
 | 
						|
 | 
						|
    @property
 | 
						|
    def capstdout(self):
 | 
						|
        """Return captured text from stdout, if capturing is enabled
 | 
						|
 | 
						|
        .. versionadded:: 3.0
 | 
						|
        """
 | 
						|
        return "".join(
 | 
						|
            content for (prefix, content) in self.get_sections("Captured stdout")
 | 
						|
        )
 | 
						|
 | 
						|
    @property
 | 
						|
    def capstderr(self):
 | 
						|
        """Return captured text from stderr, if capturing is enabled
 | 
						|
 | 
						|
        .. versionadded:: 3.0
 | 
						|
        """
 | 
						|
        return "".join(
 | 
						|
            content for (prefix, content) in self.get_sections("Captured stderr")
 | 
						|
        )
 | 
						|
 | 
						|
    passed = property(lambda x: x.outcome == "passed")
 | 
						|
    failed = property(lambda x: x.outcome == "failed")
 | 
						|
    skipped = property(lambda x: x.outcome == "skipped")
 | 
						|
 | 
						|
    @property
 | 
						|
    def fspath(self) -> str:
 | 
						|
        return self.nodeid.split("::")[0]
 | 
						|
 | 
						|
    @property
 | 
						|
    def count_towards_summary(self):
 | 
						|
        """
 | 
						|
        **Experimental**
 | 
						|
 | 
						|
        Returns True if this report should be counted towards the totals shown at the end of the
 | 
						|
        test session: "1 passed, 1 failure, etc".
 | 
						|
 | 
						|
        .. note::
 | 
						|
 | 
						|
            This function is considered **experimental**, so beware that it is subject to changes
 | 
						|
            even in patch releases.
 | 
						|
        """
 | 
						|
        return True
 | 
						|
 | 
						|
    @property
 | 
						|
    def head_line(self):
 | 
						|
        """
 | 
						|
        **Experimental**
 | 
						|
 | 
						|
        Returns the head line shown with longrepr output for this report, more commonly during
 | 
						|
        traceback representation during failures::
 | 
						|
 | 
						|
            ________ Test.foo ________
 | 
						|
 | 
						|
 | 
						|
        In the example above, the head_line is "Test.foo".
 | 
						|
 | 
						|
        .. note::
 | 
						|
 | 
						|
            This function is considered **experimental**, so beware that it is subject to changes
 | 
						|
            even in patch releases.
 | 
						|
        """
 | 
						|
        if self.location is not None:
 | 
						|
            fspath, lineno, domain = self.location
 | 
						|
            return domain
 | 
						|
 | 
						|
    def _get_verbose_word(self, config):
 | 
						|
        _category, _short, verbose = config.hook.pytest_report_teststatus(
 | 
						|
            report=self, config=config
 | 
						|
        )
 | 
						|
        return verbose
 | 
						|
 | 
						|
    def _to_json(self):
 | 
						|
        """
 | 
						|
        This was originally the serialize_report() function from xdist (ca03269).
 | 
						|
 | 
						|
        Returns the contents of this report as a dict of builtin entries, suitable for
 | 
						|
        serialization.
 | 
						|
 | 
						|
        Experimental method.
 | 
						|
        """
 | 
						|
        return _report_to_json(self)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _from_json(cls, reportdict):
 | 
						|
        """
 | 
						|
        This was originally the serialize_report() function from xdist (ca03269).
 | 
						|
 | 
						|
        Factory method that returns either a TestReport or CollectReport, depending on the calling
 | 
						|
        class. It's the callers responsibility to know which class to pass here.
 | 
						|
 | 
						|
        Experimental method.
 | 
						|
        """
 | 
						|
        kwargs = _report_kwargs_from_json(reportdict)
 | 
						|
        return cls(**kwargs)
 | 
						|
 | 
						|
 | 
						|
def _report_unserialization_failure(type_name, report_class, reportdict):
 | 
						|
    url = "https://github.com/pytest-dev/pytest/issues"
 | 
						|
    stream = StringIO()
 | 
						|
    pprint("-" * 100, stream=stream)
 | 
						|
    pprint("INTERNALERROR: Unknown entry type returned: %s" % type_name, stream=stream)
 | 
						|
    pprint("report_name: %s" % report_class, stream=stream)
 | 
						|
    pprint(reportdict, stream=stream)
 | 
						|
    pprint("Please report this bug at %s" % url, stream=stream)
 | 
						|
    pprint("-" * 100, stream=stream)
 | 
						|
    raise RuntimeError(stream.getvalue())
 | 
						|
 | 
						|
 | 
						|
class TestReport(BaseReport):
 | 
						|
    """ Basic test report object (also used for setup and teardown calls if
 | 
						|
    they fail).
 | 
						|
    """
 | 
						|
 | 
						|
    __test__ = False
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        nodeid,
 | 
						|
        location: Tuple[str, Optional[int], str],
 | 
						|
        keywords,
 | 
						|
        outcome,
 | 
						|
        longrepr,
 | 
						|
        when,
 | 
						|
        sections=(),
 | 
						|
        duration=0,
 | 
						|
        user_properties=None,
 | 
						|
        **extra
 | 
						|
    ) -> None:
 | 
						|
        #: normalized collection node id
 | 
						|
        self.nodeid = nodeid
 | 
						|
 | 
						|
        #: a (filesystempath, lineno, domaininfo) tuple indicating the
 | 
						|
        #: actual location of a test item - it might be different from the
 | 
						|
        #: collected one e.g. if a method is inherited from a different module.
 | 
						|
        self.location = location  # type: Tuple[str, Optional[int], str]
 | 
						|
 | 
						|
        #: a name -> value dictionary containing all keywords and
 | 
						|
        #: markers associated with a test invocation.
 | 
						|
        self.keywords = keywords
 | 
						|
 | 
						|
        #: test outcome, always one of "passed", "failed", "skipped".
 | 
						|
        self.outcome = outcome
 | 
						|
 | 
						|
        #: None or a failure representation.
 | 
						|
        self.longrepr = longrepr
 | 
						|
 | 
						|
        #: one of 'setup', 'call', 'teardown' to indicate runtest phase.
 | 
						|
        self.when = when
 | 
						|
 | 
						|
        #: user properties is a list of tuples (name, value) that holds user
 | 
						|
        #: defined properties of the test
 | 
						|
        self.user_properties = list(user_properties or [])
 | 
						|
 | 
						|
        #: list of pairs ``(str, str)`` of extra information which needs to
 | 
						|
        #: marshallable. Used by pytest to add captured text
 | 
						|
        #: from ``stdout`` and ``stderr``, but may be used by other plugins
 | 
						|
        #: to add arbitrary information to reports.
 | 
						|
        self.sections = list(sections)
 | 
						|
 | 
						|
        #: time it took to run just the test
 | 
						|
        self.duration = duration
 | 
						|
 | 
						|
        self.__dict__.update(extra)
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return "<{} {!r} when={!r} outcome={!r}>".format(
 | 
						|
            self.__class__.__name__, self.nodeid, self.when, self.outcome
 | 
						|
        )
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def from_item_and_call(cls, item, call) -> "TestReport":
 | 
						|
        """
 | 
						|
        Factory method to create and fill a TestReport with standard item and call info.
 | 
						|
        """
 | 
						|
        when = call.when
 | 
						|
        duration = call.stop - call.start
 | 
						|
        keywords = {x: 1 for x in item.keywords}
 | 
						|
        excinfo = call.excinfo
 | 
						|
        sections = []
 | 
						|
        if not call.excinfo:
 | 
						|
            outcome = "passed"
 | 
						|
            longrepr = None
 | 
						|
        else:
 | 
						|
            if not isinstance(excinfo, ExceptionInfo):
 | 
						|
                outcome = "failed"
 | 
						|
                longrepr = excinfo
 | 
						|
            # Type ignored -- see comment where skip.Exception is defined.
 | 
						|
            elif excinfo.errisinstance(skip.Exception):  # type: ignore
 | 
						|
                outcome = "skipped"
 | 
						|
                r = excinfo._getreprcrash()
 | 
						|
                longrepr = (str(r.path), r.lineno, r.message)
 | 
						|
            else:
 | 
						|
                outcome = "failed"
 | 
						|
                if call.when == "call":
 | 
						|
                    longrepr = item.repr_failure(excinfo)
 | 
						|
                else:  # exception in setup or teardown
 | 
						|
                    longrepr = item._repr_failure_py(
 | 
						|
                        excinfo, style=item.config.getoption("tbstyle", "auto")
 | 
						|
                    )
 | 
						|
        for rwhen, key, content in item._report_sections:
 | 
						|
            sections.append(("Captured {} {}".format(key, rwhen), content))
 | 
						|
        return cls(
 | 
						|
            item.nodeid,
 | 
						|
            item.location,
 | 
						|
            keywords,
 | 
						|
            outcome,
 | 
						|
            longrepr,
 | 
						|
            when,
 | 
						|
            sections,
 | 
						|
            duration,
 | 
						|
            user_properties=item.user_properties,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
class CollectReport(BaseReport):
 | 
						|
    when = "collect"
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self, nodeid: str, outcome, longrepr, result: List[Node], sections=(), **extra
 | 
						|
    ) -> None:
 | 
						|
        self.nodeid = nodeid
 | 
						|
        self.outcome = outcome
 | 
						|
        self.longrepr = longrepr
 | 
						|
        self.result = result or []
 | 
						|
        self.sections = list(sections)
 | 
						|
        self.__dict__.update(extra)
 | 
						|
 | 
						|
    @property
 | 
						|
    def location(self):
 | 
						|
        return (self.fspath, None, self.fspath)
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return "<CollectReport {!r} lenresult={} outcome={!r}>".format(
 | 
						|
            self.nodeid, len(self.result), self.outcome
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
class CollectErrorRepr(TerminalRepr):
 | 
						|
    def __init__(self, msg):
 | 
						|
        self.longrepr = msg
 | 
						|
 | 
						|
    def toterminal(self, out) -> None:
 | 
						|
        out.line(self.longrepr, red=True)
 | 
						|
 | 
						|
 | 
						|
def pytest_report_to_serializable(report):
 | 
						|
    if isinstance(report, (TestReport, CollectReport)):
 | 
						|
        data = report._to_json()
 | 
						|
        data["$report_type"] = report.__class__.__name__
 | 
						|
        return data
 | 
						|
 | 
						|
 | 
						|
def pytest_report_from_serializable(data):
 | 
						|
    if "$report_type" in data:
 | 
						|
        if data["$report_type"] == "TestReport":
 | 
						|
            return TestReport._from_json(data)
 | 
						|
        elif data["$report_type"] == "CollectReport":
 | 
						|
            return CollectReport._from_json(data)
 | 
						|
        assert False, "Unknown report_type unserialize data: {}".format(
 | 
						|
            data["$report_type"]
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def _report_to_json(report):
 | 
						|
    """
 | 
						|
    This was originally the serialize_report() function from xdist (ca03269).
 | 
						|
 | 
						|
    Returns the contents of this report as a dict of builtin entries, suitable for
 | 
						|
    serialization.
 | 
						|
    """
 | 
						|
 | 
						|
    def serialize_repr_entry(entry):
 | 
						|
        entry_data = {"type": type(entry).__name__, "data": entry.__dict__.copy()}
 | 
						|
        for key, value in entry_data["data"].items():
 | 
						|
            if hasattr(value, "__dict__"):
 | 
						|
                entry_data["data"][key] = value.__dict__.copy()
 | 
						|
        return entry_data
 | 
						|
 | 
						|
    def serialize_repr_traceback(reprtraceback):
 | 
						|
        result = reprtraceback.__dict__.copy()
 | 
						|
        result["reprentries"] = [
 | 
						|
            serialize_repr_entry(x) for x in reprtraceback.reprentries
 | 
						|
        ]
 | 
						|
        return result
 | 
						|
 | 
						|
    def serialize_repr_crash(reprcrash: Optional[ReprFileLocation]):
 | 
						|
        if reprcrash is not None:
 | 
						|
            return reprcrash.__dict__.copy()
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    def serialize_longrepr(rep):
 | 
						|
        result = {
 | 
						|
            "reprcrash": serialize_repr_crash(rep.longrepr.reprcrash),
 | 
						|
            "reprtraceback": serialize_repr_traceback(rep.longrepr.reprtraceback),
 | 
						|
            "sections": rep.longrepr.sections,
 | 
						|
        }
 | 
						|
        if isinstance(rep.longrepr, ExceptionChainRepr):
 | 
						|
            result["chain"] = []
 | 
						|
            for repr_traceback, repr_crash, description in rep.longrepr.chain:
 | 
						|
                result["chain"].append(
 | 
						|
                    (
 | 
						|
                        serialize_repr_traceback(repr_traceback),
 | 
						|
                        serialize_repr_crash(repr_crash),
 | 
						|
                        description,
 | 
						|
                    )
 | 
						|
                )
 | 
						|
        else:
 | 
						|
            result["chain"] = None
 | 
						|
        return result
 | 
						|
 | 
						|
    d = report.__dict__.copy()
 | 
						|
    if hasattr(report.longrepr, "toterminal"):
 | 
						|
        if hasattr(report.longrepr, "reprtraceback") and hasattr(
 | 
						|
            report.longrepr, "reprcrash"
 | 
						|
        ):
 | 
						|
            d["longrepr"] = serialize_longrepr(report)
 | 
						|
        else:
 | 
						|
            d["longrepr"] = str(report.longrepr)
 | 
						|
    else:
 | 
						|
        d["longrepr"] = report.longrepr
 | 
						|
    for name in d:
 | 
						|
        if isinstance(d[name], (py.path.local, Path)):
 | 
						|
            d[name] = str(d[name])
 | 
						|
        elif name == "result":
 | 
						|
            d[name] = None  # for now
 | 
						|
    return d
 | 
						|
 | 
						|
 | 
						|
def _report_kwargs_from_json(reportdict):
 | 
						|
    """
 | 
						|
    This was originally the serialize_report() function from xdist (ca03269).
 | 
						|
 | 
						|
    Returns **kwargs that can be used to construct a TestReport or CollectReport instance.
 | 
						|
    """
 | 
						|
 | 
						|
    def deserialize_repr_entry(entry_data):
 | 
						|
        data = entry_data["data"]
 | 
						|
        entry_type = entry_data["type"]
 | 
						|
        if entry_type == "ReprEntry":
 | 
						|
            reprfuncargs = None
 | 
						|
            reprfileloc = None
 | 
						|
            reprlocals = None
 | 
						|
            if data["reprfuncargs"]:
 | 
						|
                reprfuncargs = ReprFuncArgs(**data["reprfuncargs"])
 | 
						|
            if data["reprfileloc"]:
 | 
						|
                reprfileloc = ReprFileLocation(**data["reprfileloc"])
 | 
						|
            if data["reprlocals"]:
 | 
						|
                reprlocals = ReprLocals(data["reprlocals"]["lines"])
 | 
						|
 | 
						|
            reprentry = ReprEntry(
 | 
						|
                lines=data["lines"],
 | 
						|
                reprfuncargs=reprfuncargs,
 | 
						|
                reprlocals=reprlocals,
 | 
						|
                filelocrepr=reprfileloc,
 | 
						|
                style=data["style"],
 | 
						|
            )  # type: Union[ReprEntry, ReprEntryNative]
 | 
						|
        elif entry_type == "ReprEntryNative":
 | 
						|
            reprentry = ReprEntryNative(data["lines"])
 | 
						|
        else:
 | 
						|
            _report_unserialization_failure(entry_type, TestReport, reportdict)
 | 
						|
        return reprentry
 | 
						|
 | 
						|
    def deserialize_repr_traceback(repr_traceback_dict):
 | 
						|
        repr_traceback_dict["reprentries"] = [
 | 
						|
            deserialize_repr_entry(x) for x in repr_traceback_dict["reprentries"]
 | 
						|
        ]
 | 
						|
        return ReprTraceback(**repr_traceback_dict)
 | 
						|
 | 
						|
    def deserialize_repr_crash(repr_crash_dict: Optional[dict]):
 | 
						|
        if repr_crash_dict is not None:
 | 
						|
            return ReprFileLocation(**repr_crash_dict)
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    if (
 | 
						|
        reportdict["longrepr"]
 | 
						|
        and "reprcrash" in reportdict["longrepr"]
 | 
						|
        and "reprtraceback" in reportdict["longrepr"]
 | 
						|
    ):
 | 
						|
 | 
						|
        reprtraceback = deserialize_repr_traceback(
 | 
						|
            reportdict["longrepr"]["reprtraceback"]
 | 
						|
        )
 | 
						|
        reprcrash = deserialize_repr_crash(reportdict["longrepr"]["reprcrash"])
 | 
						|
        if reportdict["longrepr"]["chain"]:
 | 
						|
            chain = []
 | 
						|
            for repr_traceback_data, repr_crash_data, description in reportdict[
 | 
						|
                "longrepr"
 | 
						|
            ]["chain"]:
 | 
						|
                chain.append(
 | 
						|
                    (
 | 
						|
                        deserialize_repr_traceback(repr_traceback_data),
 | 
						|
                        deserialize_repr_crash(repr_crash_data),
 | 
						|
                        description,
 | 
						|
                    )
 | 
						|
                )
 | 
						|
            exception_info = ExceptionChainRepr(
 | 
						|
                chain
 | 
						|
            )  # type: Union[ExceptionChainRepr,ReprExceptionInfo]
 | 
						|
        else:
 | 
						|
            exception_info = ReprExceptionInfo(reprtraceback, reprcrash)
 | 
						|
 | 
						|
        for section in reportdict["longrepr"]["sections"]:
 | 
						|
            exception_info.addsection(*section)
 | 
						|
        reportdict["longrepr"] = exception_info
 | 
						|
 | 
						|
    return reportdict
 |