292 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			292 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
'''some classes to handle text reports'''
 | 
						|
 | 
						|
import sys
 | 
						|
import py
 | 
						|
from py.__.test.terminal import out
 | 
						|
from py.__.test.terminal.terminal import TerminalSession
 | 
						|
 | 
						|
class Null:
 | 
						|
    """ Null objects always and reliably "do nothing." """
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs): pass
 | 
						|
    def __call__(self, *args, **kwargs): return self
 | 
						|
    def __repr__(self): return "Null()"
 | 
						|
    def __str__(self): return repr(self) + ' with id:' + str(id(self))
 | 
						|
    def __nonzero__(self): return 0
 | 
						|
 | 
						|
    def __getattr__(self, name): return self
 | 
						|
    def __setattr__(self, name, value): return self
 | 
						|
    def __delattr__(self, name): return self
 | 
						|
 | 
						|
    
 | 
						|
_NotExecuted = 'NotExecuted'
 | 
						|
_Passed = 'Passed'
 | 
						|
_Failed = 'Failed'
 | 
						|
_Skipped = 'Skipped'
 | 
						|
_ExceptionFailure = 'ExceptionFailure'
 | 
						|
 | 
						|
class Status(object):
 | 
						|
    '''Represents  py.test.Collector.Outcome as a string.
 | 
						|
    Possible values: NotExecuted, Passed, Skipped, Failed, ExceptionFailure'''
 | 
						|
 | 
						|
    def NotExecuted(cls):
 | 
						|
        return cls('NotExecuted')
 | 
						|
    NotExecuted = classmethod(NotExecuted) 
 | 
						|
 | 
						|
    def Passed(cls):
 | 
						|
        return cls('Passed')
 | 
						|
    Passed = classmethod(Passed) 
 | 
						|
 | 
						|
    def Failed(cls):
 | 
						|
        return cls('Failed')
 | 
						|
    Failed = classmethod(Failed) 
 | 
						|
 | 
						|
    def Skipped(cls):
 | 
						|
        return cls('Skipped')
 | 
						|
    Skipped = classmethod(Skipped) 
 | 
						|
 | 
						|
    def ExceptionFailure(cls):
 | 
						|
        return cls(_ExceptionFailure)
 | 
						|
    ExceptionFailure = classmethod(ExceptionFailure) 
 | 
						|
 | 
						|
    ordered_list = [_NotExecuted,
 | 
						|
                    _Passed,
 | 
						|
                    _Skipped,
 | 
						|
                    _Failed,
 | 
						|
                    _ExceptionFailure]
 | 
						|
        
 | 
						|
    namemap = {
 | 
						|
        py.test.Item.Passed: _Passed,
 | 
						|
        py.test.Item.Skipped: _Skipped,
 | 
						|
        py.test.Item.Failed: _Failed,
 | 
						|
        }
 | 
						|
    
 | 
						|
    def __init__(self, outcome_or_name = ''):
 | 
						|
        self.str = _NotExecuted
 | 
						|
        if isinstance(outcome_or_name, py.test.Item.Outcome):
 | 
						|
            for restype, name in self.namemap.items():
 | 
						|
                if isinstance(outcome_or_name, restype):
 | 
						|
                    self.str = name
 | 
						|
        else:
 | 
						|
            if str(outcome_or_name) in self.ordered_list:
 | 
						|
                self.str = str(outcome_or_name)
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return 'Status("%s")' % self.str
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        return self.str
 | 
						|
 | 
						|
    def update(self, status):
 | 
						|
        '''merge self and status, self will be set to the "higher" status
 | 
						|
        in ordered_list'''
 | 
						|
        name_int_map = dict(zip(self.ordered_list, range(len(self.ordered_list))))
 | 
						|
        self.str = self.ordered_list[max([name_int_map[i]
 | 
						|
                                          for i in (str(status), self.str)])]
 | 
						|
        
 | 
						|
    def __eq__(self, other):
 | 
						|
        return self.str == other.str
 | 
						|
 | 
						|
    def __ne__(self, other):
 | 
						|
        return not self.__eq__(other)
 | 
						|
 | 
						|
class OutBuffer(out.Out):
 | 
						|
    '''Simple MockObject for py.__.test.report.text.out.Out.
 | 
						|
    Used to get the output of TerminalSession.'''
 | 
						|
    def __init__(self, fullwidth = 80 -1):
 | 
						|
        self.output = []
 | 
						|
        self.fullwidth = fullwidth
 | 
						|
 | 
						|
    def line(self, s= ''):
 | 
						|
        self.output.append(str(s) + '\n')
 | 
						|
 | 
						|
    def write(self, s):
 | 
						|
        self.output.append(str(s))
 | 
						|
 | 
						|
    def getoutput(self):
 | 
						|
        return ''.join(self.output)
 | 
						|
 | 
						|
    def rewrite(self, s=''):
 | 
						|
        self.write(s)
 | 
						|
 | 
						|
   
 | 
						|
 | 
						|
 | 
						|
class TestReport(object):
 | 
						|
    '''"Channel-save" report of a py.test.Collector.Outcome instance'''   
 | 
						|
 | 
						|
    root_id = 'TestReport Root ID'
 | 
						|
    template = {'time' : 0,
 | 
						|
                'label': 'Root',
 | 
						|
                'id': root_id,
 | 
						|
                'full_id': tuple(),
 | 
						|
                'status': Status.NotExecuted(),
 | 
						|
                'report': 'NoReport',
 | 
						|
                'error_report': '',
 | 
						|
                'finished': False,
 | 
						|
                'restart_params': None, # ('',('',))
 | 
						|
                'path' : '',
 | 
						|
                'modpath': '',
 | 
						|
                'is_item': False,
 | 
						|
                'stdout': '',
 | 
						|
                'stderr': '',
 | 
						|
                }
 | 
						|
    Status = Status
 | 
						|
    
 | 
						|
    def fromChannel(cls, kwdict):
 | 
						|
        ''' TestReport.fromChannel(report.toChannel()) == report '''
 | 
						|
        if 'status' in kwdict:
 | 
						|
            kwdict['status'] = Status(kwdict['status'])
 | 
						|
        return cls(**kwdict)
 | 
						|
    fromChannel = classmethod(fromChannel)
 | 
						|
 | 
						|
    def __init__(self, **kwargs):
 | 
						|
        # copy status -> deepcopy
 | 
						|
        kwdict = py.std.copy.deepcopy(self.template)
 | 
						|
        kwdict.update(kwargs)
 | 
						|
        for key, value in kwdict.iteritems():
 | 
						|
            setattr(self, key, value)
 | 
						|
 | 
						|
    def start(self, collector):
 | 
						|
        '''Session.start should call this to init the report'''
 | 
						|
        self.full_id = tuple(collector.listnames())
 | 
						|
        self.id = collector.name
 | 
						|
        if collector.getpathlineno(): # save for Null() in test_util.py
 | 
						|
            fspath, lineno = collector.getpathlineno()
 | 
						|
            if lineno != sys.maxint:
 | 
						|
                str_append = ' [%s:%s]' % (fspath.basename, lineno)
 | 
						|
            else:
 | 
						|
                str_append = ' [%s]' % fspath.basename
 | 
						|
            self.label = collector.name + str_append
 | 
						|
            
 | 
						|
        self.path = '/'.join(collector.listnames())
 | 
						|
        #self.modpath = collector.getmodpath()
 | 
						|
        self.settime()
 | 
						|
        self.restart_params = (str(collector.listchain()[0].fspath),
 | 
						|
                               collector.listnames())
 | 
						|
        self.status = Status.NotExecuted()
 | 
						|
        self.is_item = isinstance(collector, py.test.Item)
 | 
						|
 | 
						|
    def finish(self, collector, res, config = Null()):
 | 
						|
        '''Session.finish should call this to set the
 | 
						|
        value of error_report
 | 
						|
        option is passed to Session at initialization'''
 | 
						|
        self.settime()
 | 
						|
        if collector.getpathlineno(): # save for Null() in test_util.py
 | 
						|
            fspath, lineno = collector.getpathlineno()
 | 
						|
            if lineno != sys.maxint:
 | 
						|
                str_append = ' [%s:%s] %0.2fsecs' % (fspath.basename,
 | 
						|
                                                     lineno, self.time)
 | 
						|
            else:
 | 
						|
                str_append = ' [%s] %0.2fsecs' % (fspath.basename, self.time)
 | 
						|
            self.label = collector.name + str_append
 | 
						|
        if res:
 | 
						|
            if Status(res) == Status.Failed():
 | 
						|
                self.error_report = self.report_failed(config, collector, res)
 | 
						|
            elif Status(res) ==  Status.Skipped():
 | 
						|
                self.error_report = self.report_skipped(config, collector, res)
 | 
						|
            self.status.update(Status(res))
 | 
						|
        out, err = collector.getouterr()
 | 
						|
        self.stdout, self.stderr = str(out), str(err)
 | 
						|
        self.finished = True
 | 
						|
 | 
						|
    def abbrev_path(self, fspath):
 | 
						|
        parts = fspath.parts() 
 | 
						|
        basename = parts.pop().basename
 | 
						|
        while parts and parts[-1].basename in ('testing', 'test'): 
 | 
						|
            parts.pop() 
 | 
						|
        base = parts[-1].basename
 | 
						|
        if len(base) < 13: 
 | 
						|
            base = base + "_" * (13-len(base))
 | 
						|
        return base + "_" + basename 
 | 
						|
 | 
						|
    def report_failed(self, config, item, res):
 | 
						|
        #XXX hack abuse of TerminalSession
 | 
						|
        terminal = TerminalSession(config)
 | 
						|
        out = OutBuffer()
 | 
						|
        terminal.out = out
 | 
						|
        terminal.repr_failure(item, res)
 | 
						|
        #terminal.repr_out_err(item)
 | 
						|
        return out.getoutput()
 | 
						|
 | 
						|
    def report_skipped(self, config, item, outcome):
 | 
						|
        texts = {}
 | 
						|
        terminal = TerminalSession(config)
 | 
						|
        raisingtb = terminal.getlastvisible(outcome.excinfo.traceback) 
 | 
						|
        fn = raisingtb.frame.code.path
 | 
						|
        lineno = raisingtb.lineno
 | 
						|
        d = texts.setdefault(outcome.excinfo.exconly(), {})
 | 
						|
        d[(fn, lineno)] = outcome
 | 
						|
        out = OutBuffer()
 | 
						|
        out.sep('_', 'reasons for skipped tests')
 | 
						|
        for text, dict in texts.items():
 | 
						|
            for (fn, lineno), outcome in dict.items(): 
 | 
						|
                out.line('Skipped in %s:%d' %(fn, lineno))
 | 
						|
            out.line("reason: %s" % text) 
 | 
						|
 | 
						|
        return out.getoutput()
 | 
						|
        
 | 
						|
    def settime(self):
 | 
						|
        '''update self.time '''
 | 
						|
        self.time = py.std.time.time() - self.time 
 | 
						|
 | 
						|
    def to_channel(self):
 | 
						|
        '''counterpart of classmethod fromChannel'''
 | 
						|
        ret = self.template.copy()
 | 
						|
        for key in ret.keys():
 | 
						|
            ret[key] = getattr(self, key, self.template[key])
 | 
						|
        ret['status'] = str(ret['status'])
 | 
						|
        return ret
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        return str(self.to_channel())
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return str(self)
 | 
						|
 | 
						|
    def copy(self, **kwargs):
 | 
						|
        channel_dict = self.to_channel()
 | 
						|
        channel_dict.update(kwargs)
 | 
						|
        return TestReport.fromChannel(channel_dict)
 | 
						|
 | 
						|
 | 
						|
class TestFileWatcher:
 | 
						|
    '''watches files or paths'''
 | 
						|
    def __init__(self, *paths):
 | 
						|
        self.paths = [py.path.local(path) for path in paths]
 | 
						|
        self.watchdict = dict()
 | 
						|
 | 
						|
    def file_information(self, path):
 | 
						|
        try:
 | 
						|
            return path.stat().st_ctime
 | 
						|
        except:
 | 
						|
            return None
 | 
						|
 | 
						|
    def check_files(self):
 | 
						|
        '''returns (changed files, deleted files)'''
 | 
						|
        def fil(p):
 | 
						|
            return p.check(fnmatch='[!.]*.py')
 | 
						|
        def rec(p):
 | 
						|
            return p.check(dotfile=0)
 | 
						|
        files = []
 | 
						|
        for path in self.paths:
 | 
						|
            if path.check(file=1):
 | 
						|
                files.append(path)
 | 
						|
            else:
 | 
						|
                files.extend(path.visit(fil, rec))
 | 
						|
        newdict = dict(zip(files, [self.file_information(p) for p in files]))
 | 
						|
        files_deleted = [f for f in self.watchdict.keys()
 | 
						|
                         if not newdict.has_key(f)]
 | 
						|
        files_new = [f for f in newdict.keys() if not self.watchdict.has_key(f)]
 | 
						|
        files_changed = [f for f in newdict.keys() if self.watchdict.has_key(f)
 | 
						|
                         and newdict[f]!= self.watchdict[f]]
 | 
						|
        files_changed = files_new + files_changed
 | 
						|
 | 
						|
        self.watchdict = newdict
 | 
						|
        return files_changed, files_deleted
 | 
						|
 | 
						|
    def changed(self):
 | 
						|
        '''returns False if nothing changed'''
 | 
						|
        changed, deleted = self.check_files()
 | 
						|
        return changed != [] or deleted != []
 |