292 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			292 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
| '''some classes to handle text reports'''
 | |
| 
 | |
| import sys
 | |
| import py
 | |
| from py.__.test.terminal import out
 | |
| from py.__.test.terminal.terminal import TerminalSession
 | |
| 
 | |
| class Null:
 | |
|     """ Null objects always and reliably "do nothing." """
 | |
| 
 | |
|     def __init__(self, *args, **kwargs): pass
 | |
|     def __call__(self, *args, **kwargs): return self
 | |
|     def __repr__(self): return "Null()"
 | |
|     def __str__(self): return repr(self) + ' with id:' + str(id(self))
 | |
|     def __nonzero__(self): return 0
 | |
| 
 | |
|     def __getattr__(self, name): return self
 | |
|     def __setattr__(self, name, value): return self
 | |
|     def __delattr__(self, name): return self
 | |
| 
 | |
|     
 | |
| _NotExecuted = 'NotExecuted'
 | |
| _Passed = 'Passed'
 | |
| _Failed = 'Failed'
 | |
| _Skipped = 'Skipped'
 | |
| _ExceptionFailure = 'ExceptionFailure'
 | |
| 
 | |
| class Status(object):
 | |
|     '''Represents  py.test.Collector.Outcome as a string.
 | |
|     Possible values: NotExecuted, Passed, Skipped, Failed, ExceptionFailure'''
 | |
| 
 | |
|     def NotExecuted(cls):
 | |
|         return cls('NotExecuted')
 | |
|     NotExecuted = classmethod(NotExecuted) 
 | |
| 
 | |
|     def Passed(cls):
 | |
|         return cls('Passed')
 | |
|     Passed = classmethod(Passed) 
 | |
| 
 | |
|     def Failed(cls):
 | |
|         return cls('Failed')
 | |
|     Failed = classmethod(Failed) 
 | |
| 
 | |
|     def Skipped(cls):
 | |
|         return cls('Skipped')
 | |
|     Skipped = classmethod(Skipped) 
 | |
| 
 | |
|     def ExceptionFailure(cls):
 | |
|         return cls(_ExceptionFailure)
 | |
|     ExceptionFailure = classmethod(ExceptionFailure) 
 | |
| 
 | |
|     ordered_list = [_NotExecuted,
 | |
|                     _Passed,
 | |
|                     _Skipped,
 | |
|                     _Failed,
 | |
|                     _ExceptionFailure]
 | |
|         
 | |
|     namemap = {
 | |
|         py.test.Item.Passed: _Passed,
 | |
|         py.test.Item.Skipped: _Skipped,
 | |
|         py.test.Item.Failed: _Failed,
 | |
|         }
 | |
|     
 | |
|     def __init__(self, outcome_or_name = ''):
 | |
|         self.str = _NotExecuted
 | |
|         if isinstance(outcome_or_name, py.test.Item.Outcome):
 | |
|             for restype, name in self.namemap.items():
 | |
|                 if isinstance(outcome_or_name, restype):
 | |
|                     self.str = name
 | |
|         else:
 | |
|             if str(outcome_or_name) in self.ordered_list:
 | |
|                 self.str = str(outcome_or_name)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return 'Status("%s")' % self.str
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.str
 | |
| 
 | |
|     def update(self, status):
 | |
|         '''merge self and status, self will be set to the "higher" status
 | |
|         in ordered_list'''
 | |
|         name_int_map = dict(zip(self.ordered_list, range(len(self.ordered_list))))
 | |
|         self.str = self.ordered_list[max([name_int_map[i]
 | |
|                                           for i in (str(status), self.str)])]
 | |
|         
 | |
|     def __eq__(self, other):
 | |
|         return self.str == other.str
 | |
| 
 | |
|     def __ne__(self, other):
 | |
|         return not self.__eq__(other)
 | |
| 
 | |
| class OutBuffer(out.Out):
 | |
|     '''Simple MockObject for py.__.test.report.text.out.Out.
 | |
|     Used to get the output of TerminalSession.'''
 | |
|     def __init__(self, fullwidth = 80 -1):
 | |
|         self.output = []
 | |
|         self.fullwidth = fullwidth
 | |
| 
 | |
|     def line(self, s= ''):
 | |
|         self.output.append(str(s) + '\n')
 | |
| 
 | |
|     def write(self, s):
 | |
|         self.output.append(str(s))
 | |
| 
 | |
|     def getoutput(self):
 | |
|         return ''.join(self.output)
 | |
| 
 | |
|     def rewrite(self, s=''):
 | |
|         self.write(s)
 | |
| 
 | |
|    
 | |
| 
 | |
| 
 | |
| class TestReport(object):
 | |
|     '''"Channel-save" report of a py.test.Collector.Outcome instance'''   
 | |
| 
 | |
|     root_id = 'TestReport Root ID'
 | |
|     template = {'time' : 0,
 | |
|                 'label': 'Root',
 | |
|                 'id': root_id,
 | |
|                 'full_id': tuple(),
 | |
|                 'status': Status.NotExecuted(),
 | |
|                 'report': 'NoReport',
 | |
|                 'error_report': '',
 | |
|                 'finished': False,
 | |
|                 'restart_params': None, # ('',('',))
 | |
|                 'path' : '',
 | |
|                 'modpath': '',
 | |
|                 'is_item': False,
 | |
|                 'stdout': '',
 | |
|                 'stderr': '',
 | |
|                 }
 | |
|     Status = Status
 | |
|     
 | |
|     def fromChannel(cls, kwdict):
 | |
|         ''' TestReport.fromChannel(report.toChannel()) == report '''
 | |
|         if 'status' in kwdict:
 | |
|             kwdict['status'] = Status(kwdict['status'])
 | |
|         return cls(**kwdict)
 | |
|     fromChannel = classmethod(fromChannel)
 | |
| 
 | |
|     def __init__(self, **kwargs):
 | |
|         # copy status -> deepcopy
 | |
|         kwdict = py.std.copy.deepcopy(self.template)
 | |
|         kwdict.update(kwargs)
 | |
|         for key, value in kwdict.iteritems():
 | |
|             setattr(self, key, value)
 | |
| 
 | |
|     def start(self, collector):
 | |
|         '''Session.start should call this to init the report'''
 | |
|         self.full_id = tuple(collector.listnames())
 | |
|         self.id = collector.name
 | |
|         if collector.getpathlineno(): # save for Null() in test_util.py
 | |
|             fspath, lineno = collector.getpathlineno()
 | |
|             if lineno != sys.maxint:
 | |
|                 str_append = ' [%s:%s]' % (fspath.basename, lineno)
 | |
|             else:
 | |
|                 str_append = ' [%s]' % fspath.basename
 | |
|             self.label = collector.name + str_append
 | |
|             
 | |
|         self.path = '/'.join(collector.listnames())
 | |
|         #self.modpath = collector.getmodpath()
 | |
|         self.settime()
 | |
|         self.restart_params = (str(collector.listchain()[0].fspath),
 | |
|                                collector.listnames())
 | |
|         self.status = Status.NotExecuted()
 | |
|         self.is_item = isinstance(collector, py.test.Item)
 | |
| 
 | |
|     def finish(self, collector, res, config = Null()):
 | |
|         '''Session.finish should call this to set the
 | |
|         value of error_report
 | |
|         option is passed to Session at initialization'''
 | |
|         self.settime()
 | |
|         if collector.getpathlineno(): # save for Null() in test_util.py
 | |
|             fspath, lineno = collector.getpathlineno()
 | |
|             if lineno != sys.maxint:
 | |
|                 str_append = ' [%s:%s] %0.2fsecs' % (fspath.basename,
 | |
|                                                      lineno, self.time)
 | |
|             else:
 | |
|                 str_append = ' [%s] %0.2fsecs' % (fspath.basename, self.time)
 | |
|             self.label = collector.name + str_append
 | |
|         if res:
 | |
|             if Status(res) == Status.Failed():
 | |
|                 self.error_report = self.report_failed(config, collector, res)
 | |
|             elif Status(res) ==  Status.Skipped():
 | |
|                 self.error_report = self.report_skipped(config, collector, res)
 | |
|             self.status.update(Status(res))
 | |
|         out, err = collector.getouterr()
 | |
|         self.stdout, self.stderr = str(out), str(err)
 | |
|         self.finished = True
 | |
| 
 | |
|     def abbrev_path(self, fspath):
 | |
|         parts = fspath.parts() 
 | |
|         basename = parts.pop().basename
 | |
|         while parts and parts[-1].basename in ('testing', 'test'): 
 | |
|             parts.pop() 
 | |
|         base = parts[-1].basename
 | |
|         if len(base) < 13: 
 | |
|             base = base + "_" * (13-len(base))
 | |
|         return base + "_" + basename 
 | |
| 
 | |
|     def report_failed(self, config, item, res):
 | |
|         #XXX hack abuse of TerminalSession
 | |
|         terminal = TerminalSession(config)
 | |
|         out = OutBuffer()
 | |
|         terminal.out = out
 | |
|         terminal.repr_failure(item, res)
 | |
|         #terminal.repr_out_err(item)
 | |
|         return out.getoutput()
 | |
| 
 | |
|     def report_skipped(self, config, item, outcome):
 | |
|         texts = {}
 | |
|         terminal = TerminalSession(config)
 | |
|         raisingtb = terminal.getlastvisible(outcome.excinfo.traceback) 
 | |
|         fn = raisingtb.frame.code.path
 | |
|         lineno = raisingtb.lineno
 | |
|         d = texts.setdefault(outcome.excinfo.exconly(), {})
 | |
|         d[(fn, lineno)] = outcome
 | |
|         out = OutBuffer()
 | |
|         out.sep('_', 'reasons for skipped tests')
 | |
|         for text, dict in texts.items():
 | |
|             for (fn, lineno), outcome in dict.items(): 
 | |
|                 out.line('Skipped in %s:%d' %(fn, lineno))
 | |
|             out.line("reason: %s" % text) 
 | |
| 
 | |
|         return out.getoutput()
 | |
|         
 | |
|     def settime(self):
 | |
|         '''update self.time '''
 | |
|         self.time = py.std.time.time() - self.time 
 | |
| 
 | |
|     def to_channel(self):
 | |
|         '''counterpart of classmethod fromChannel'''
 | |
|         ret = self.template.copy()
 | |
|         for key in ret.keys():
 | |
|             ret[key] = getattr(self, key, self.template[key])
 | |
|         ret['status'] = str(ret['status'])
 | |
|         return ret
 | |
| 
 | |
|     def __str__(self):
 | |
|         return str(self.to_channel())
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return str(self)
 | |
| 
 | |
|     def copy(self, **kwargs):
 | |
|         channel_dict = self.to_channel()
 | |
|         channel_dict.update(kwargs)
 | |
|         return TestReport.fromChannel(channel_dict)
 | |
| 
 | |
| 
 | |
| class TestFileWatcher:
 | |
|     '''watches files or paths'''
 | |
|     def __init__(self, *paths):
 | |
|         self.paths = [py.path.local(path) for path in paths]
 | |
|         self.watchdict = dict()
 | |
| 
 | |
|     def file_information(self, path):
 | |
|         try:
 | |
|             return path.stat().st_ctime
 | |
|         except:
 | |
|             return None
 | |
| 
 | |
|     def check_files(self):
 | |
|         '''returns (changed files, deleted files)'''
 | |
|         def fil(p):
 | |
|             return p.check(fnmatch='[!.]*.py')
 | |
|         def rec(p):
 | |
|             return p.check(dotfile=0)
 | |
|         files = []
 | |
|         for path in self.paths:
 | |
|             if path.check(file=1):
 | |
|                 files.append(path)
 | |
|             else:
 | |
|                 files.extend(path.visit(fil, rec))
 | |
|         newdict = dict(zip(files, [self.file_information(p) for p in files]))
 | |
|         files_deleted = [f for f in self.watchdict.keys()
 | |
|                          if not newdict.has_key(f)]
 | |
|         files_new = [f for f in newdict.keys() if not self.watchdict.has_key(f)]
 | |
|         files_changed = [f for f in newdict.keys() if self.watchdict.has_key(f)
 | |
|                          and newdict[f]!= self.watchdict[f]]
 | |
|         files_changed = files_new + files_changed
 | |
| 
 | |
|         self.watchdict = newdict
 | |
|         return files_changed, files_deleted
 | |
| 
 | |
|     def changed(self):
 | |
|         '''returns False if nothing changed'''
 | |
|         changed, deleted = self.check_files()
 | |
|         return changed != [] or deleted != []
 |