259 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
""" 
 | 
						|
    collect and run test items. 
 | 
						|
 | 
						|
    * executing test items 
 | 
						|
    * running collectors 
 | 
						|
    * and generating report events about it 
 | 
						|
"""
 | 
						|
 | 
						|
import py
 | 
						|
 | 
						|
from py.__.test.outcome import Skipped
 | 
						|
 | 
						|
#
 | 
						|
# pytest plugin hooks 
 | 
						|
 | 
						|
def pytest_addoption(parser):
 | 
						|
    group = parser.getgroup("general") 
 | 
						|
    group.addoption('--boxed',
 | 
						|
               action="store_true", dest="boxed", default=False,
 | 
						|
               help="box each test run in a separate process") 
 | 
						|
 | 
						|
# XXX move to pytest_sessionstart and fix py.test owns tests 
 | 
						|
def pytest_configure(config):
 | 
						|
    config._setupstate = SetupState()
 | 
						|
 | 
						|
def pytest_sessionfinish(session, exitstatus, excrepr=None):
 | 
						|
    # XXX see above
 | 
						|
    if hasattr(session.config, '_setupstate'):
 | 
						|
        session.config._setupstate.teardown_all()
 | 
						|
 | 
						|
def pytest_make_collect_report(collector):
 | 
						|
    call = collector.config.guardedcall(
 | 
						|
        lambda: collector._memocollect()
 | 
						|
    )
 | 
						|
    result = None
 | 
						|
    if not call.excinfo:
 | 
						|
        result = call.result
 | 
						|
    return CollectReport(collector, result, call.excinfo, call.outerr)
 | 
						|
 | 
						|
    return report 
 | 
						|
 | 
						|
def pytest_runtest_protocol(item):
 | 
						|
    if item.config.getvalue("boxed"):
 | 
						|
        reports = forked_run_report(item) 
 | 
						|
        for rep in reports:
 | 
						|
            item.config.hook.pytest_runtest_logreport(rep=rep)
 | 
						|
    else:
 | 
						|
        runtestprotocol(item)
 | 
						|
    return True
 | 
						|
 | 
						|
def runtestprotocol(item, log=True):
 | 
						|
    rep = call_and_report(item, "setup", log)
 | 
						|
    reports = [rep]
 | 
						|
    if rep.passed:
 | 
						|
        reports.append(call_and_report(item, "call", log))
 | 
						|
        reports.append(call_and_report(item, "teardown", log))
 | 
						|
    return reports
 | 
						|
 | 
						|
def pytest_runtest_setup(item):
 | 
						|
    item.config._setupstate.prepare(item)
 | 
						|
 | 
						|
def pytest_runtest_call(item):
 | 
						|
    if not item._deprecated_testexecution():
 | 
						|
        item.runtest()
 | 
						|
 | 
						|
def pytest_runtest_makereport(item, call):
 | 
						|
    return ItemTestReport(item, call.excinfo, call.when, call.outerr)
 | 
						|
 | 
						|
def pytest_runtest_teardown(item):
 | 
						|
    item.config._setupstate.teardown_exact(item)
 | 
						|
 | 
						|
#
 | 
						|
# Implementation
 | 
						|
 | 
						|
def call_and_report(item, when, log=True):
 | 
						|
    call = RuntestHookCall(item, when)
 | 
						|
    hook = item.config.hook
 | 
						|
    report = hook.pytest_runtest_makereport(item=item, call=call)
 | 
						|
    if log and (when == "call" or not report.passed):
 | 
						|
        hook.pytest_runtest_logreport(rep=report) 
 | 
						|
    return report
 | 
						|
 | 
						|
 | 
						|
class RuntestHookCall:
 | 
						|
    excinfo = None 
 | 
						|
    _prefix = "pytest_runtest_"
 | 
						|
    def __init__(self, item, when):
 | 
						|
        self.when = when 
 | 
						|
        hookname = self._prefix + when 
 | 
						|
        hook = getattr(item.config.hook, hookname)
 | 
						|
        capture = item.config._getcapture()
 | 
						|
        try:
 | 
						|
            try:
 | 
						|
                self.result = hook(item=item)
 | 
						|
            except KeyboardInterrupt:
 | 
						|
                raise
 | 
						|
            except:
 | 
						|
                self.excinfo = py.code.ExceptionInfo()
 | 
						|
        finally:
 | 
						|
            self.outerr = capture.reset()
 | 
						|
 | 
						|
def forked_run_report(item):
 | 
						|
    # for now, we run setup/teardown in the subprocess 
 | 
						|
    # XXX optionally allow sharing of setup/teardown 
 | 
						|
    EXITSTATUS_TESTEXIT = 4
 | 
						|
    from py.__.test.dist.mypickle import ImmutablePickler
 | 
						|
    ipickle = ImmutablePickler(uneven=0)
 | 
						|
    ipickle.selfmemoize(item.config)
 | 
						|
    # XXX workaround the issue that 2.6 cannot pickle 
 | 
						|
    # instances of classes defined in global conftest.py files
 | 
						|
    ipickle.selfmemoize(item) 
 | 
						|
    def runforked():
 | 
						|
        try:
 | 
						|
            reports = runtestprotocol(item, log=False)
 | 
						|
        except KeyboardInterrupt: 
 | 
						|
            py.std.os._exit(EXITSTATUS_TESTEXIT)
 | 
						|
        return ipickle.dumps(reports)
 | 
						|
 | 
						|
    ff = py.process.ForkedFunc(runforked)
 | 
						|
    result = ff.waitfinish()
 | 
						|
    if result.retval is not None:
 | 
						|
        return ipickle.loads(result.retval)
 | 
						|
    else:
 | 
						|
        if result.exitstatus == EXITSTATUS_TESTEXIT:
 | 
						|
            py.test.exit("forked test item %s raised Exit" %(item,))
 | 
						|
        return [report_process_crash(item, result)]
 | 
						|
 | 
						|
def report_process_crash(item, result):
 | 
						|
    path, lineno = item._getfslineno()
 | 
						|
    info = "%s:%s: running the test CRASHED with signal %d" %(
 | 
						|
            path, lineno, result.signal)
 | 
						|
    return ItemTestReport(item, excinfo=info, when="???")
 | 
						|
 | 
						|
class BaseReport(object):
 | 
						|
    def __repr__(self):
 | 
						|
        l = ["%s=%s" %(key, value)
 | 
						|
           for key, value in self.__dict__.items()]
 | 
						|
        return "<%s %s>" %(self.__class__.__name__, " ".join(l),)
 | 
						|
 | 
						|
    def toterminal(self, out):
 | 
						|
        longrepr = self.longrepr 
 | 
						|
        if hasattr(longrepr, 'toterminal'):
 | 
						|
            longrepr.toterminal(out)
 | 
						|
        else:
 | 
						|
            out.line(str(longrepr))
 | 
						|
   
 | 
						|
class ItemTestReport(BaseReport):
 | 
						|
    failed = passed = skipped = False
 | 
						|
 | 
						|
    def __init__(self, item, excinfo=None, when=None, outerr=None):
 | 
						|
        self.item = item 
 | 
						|
        self.when = when
 | 
						|
        self.outerr = outerr
 | 
						|
        if item and when != "setup":
 | 
						|
            self.keywords = item.readkeywords() 
 | 
						|
        else:
 | 
						|
            # if we fail during setup it might mean 
 | 
						|
            # we are not able to access the underlying object
 | 
						|
            # this might e.g. happen if we are unpickled 
 | 
						|
            # and our parent collector did not collect us 
 | 
						|
            # (because it e.g. skipped for platform reasons)
 | 
						|
            self.keywords = {}  
 | 
						|
        if not excinfo:
 | 
						|
            self.passed = True
 | 
						|
            self.shortrepr = "." 
 | 
						|
        else:
 | 
						|
            if not isinstance(excinfo, py.code.ExceptionInfo):
 | 
						|
                self.failed = True
 | 
						|
                shortrepr = "?"
 | 
						|
                longrepr = excinfo 
 | 
						|
            elif excinfo.errisinstance(Skipped):
 | 
						|
                self.skipped = True 
 | 
						|
                shortrepr = "s"
 | 
						|
                longrepr = self.item._repr_failure_py(excinfo, outerr)
 | 
						|
            else:
 | 
						|
                self.failed = True
 | 
						|
                shortrepr = self.item.shortfailurerepr
 | 
						|
                if self.when == "call":
 | 
						|
                    longrepr = self.item.repr_failure(excinfo, outerr)
 | 
						|
                else: # exception in setup or teardown 
 | 
						|
                    longrepr = self.item._repr_failure_py(excinfo, outerr)
 | 
						|
                    shortrepr = shortrepr.lower()
 | 
						|
            self.shortrepr = shortrepr 
 | 
						|
            self.longrepr = longrepr 
 | 
						|
 | 
						|
    def getnode(self):
 | 
						|
        return self.item 
 | 
						|
 | 
						|
class CollectReport(BaseReport):
 | 
						|
    skipped = failed = passed = False 
 | 
						|
 | 
						|
    def __init__(self, collector, result, excinfo=None, outerr=None):
 | 
						|
        self.collector = collector 
 | 
						|
        if not excinfo:
 | 
						|
            self.passed = True
 | 
						|
            self.result = result 
 | 
						|
        else:
 | 
						|
            self.outerr = outerr
 | 
						|
            self.longrepr = self.collector._repr_failure_py(excinfo, outerr)
 | 
						|
            if excinfo.errisinstance(Skipped):
 | 
						|
                self.skipped = True
 | 
						|
                self.reason = str(excinfo.value)
 | 
						|
            else:
 | 
						|
                self.failed = True
 | 
						|
 | 
						|
    def getnode(self):
 | 
						|
        return self.collector 
 | 
						|
 | 
						|
class SetupState(object):
 | 
						|
    """ shared state for setting up/tearing down test items or collectors. """
 | 
						|
    def __init__(self):
 | 
						|
        self.stack = []
 | 
						|
        self._finalizers = {}
 | 
						|
 | 
						|
    def addfinalizer(self, finalizer, colitem):
 | 
						|
        """ attach a finalizer to the given colitem. 
 | 
						|
        if colitem is None, this will add a finalizer that 
 | 
						|
        is called at the end of teardown_all(). 
 | 
						|
        """
 | 
						|
        assert callable(finalizer)
 | 
						|
        #assert colitem in self.stack
 | 
						|
        self._finalizers.setdefault(colitem, []).append(finalizer)
 | 
						|
 | 
						|
    def _pop_and_teardown(self):
 | 
						|
        colitem = self.stack.pop()
 | 
						|
        self._teardown_with_finalization(colitem)
 | 
						|
 | 
						|
    def _teardown_with_finalization(self, colitem): 
 | 
						|
        finalizers = self._finalizers.pop(colitem, None)
 | 
						|
        while finalizers:
 | 
						|
            fin = finalizers.pop()
 | 
						|
            fin()
 | 
						|
        if colitem: 
 | 
						|
            colitem.teardown()
 | 
						|
        for colitem in self._finalizers:
 | 
						|
            assert colitem is None or colitem in self.stack
 | 
						|
 | 
						|
    def teardown_all(self): 
 | 
						|
        while self.stack: 
 | 
						|
            self._pop_and_teardown()
 | 
						|
        self._teardown_with_finalization(None)
 | 
						|
        assert not self._finalizers
 | 
						|
 | 
						|
    def teardown_exact(self, item):
 | 
						|
        assert self.stack and self.stack[-1] == item
 | 
						|
        self._pop_and_teardown()
 | 
						|
     
 | 
						|
    def prepare(self, colitem): 
 | 
						|
        """ setup objects along the collector chain to the test-method
 | 
						|
            Teardown any unneccessary previously setup objects."""
 | 
						|
        needed_collectors = colitem.listchain() 
 | 
						|
        while self.stack: 
 | 
						|
            if self.stack == needed_collectors[:len(self.stack)]: 
 | 
						|
                break 
 | 
						|
            self._pop_and_teardown()
 | 
						|
        for col in needed_collectors[len(self.stack):]: 
 | 
						|
            col.setup() 
 | 
						|
            self.stack.append(col) 
 |