149 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			149 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
| import os
 | |
| import sys
 | |
| import py
 | |
| try: from cStringIO import StringIO
 | |
| except ImportError: from StringIO import StringIO
 | |
| 
 | |
| 
 | |
| class Capture(object):
 | |
|     def call(cls, func, *args, **kwargs): 
 | |
|         """ return a (res, out, err) tuple where
 | |
|             out and err represent the output/error output
 | |
|             during function execution. 
 | |
|             call the given function with args/kwargs
 | |
|             and capture output/error during its execution. 
 | |
|         """ 
 | |
|         so = cls()
 | |
|         try: 
 | |
|             res = func(*args, **kwargs)
 | |
|         finally: 
 | |
|             out, err = so.reset()
 | |
|         return res, out, err 
 | |
|     call = classmethod(call) 
 | |
| 
 | |
|     def reset(self): 
 | |
|         """ reset sys.stdout and sys.stderr and return captured output 
 | |
|             as strings and restore sys.stdout/err.
 | |
|         """
 | |
|         x, y = self.done() 
 | |
|         outerr = x.read(), y.read() 
 | |
|         x.close()
 | |
|         y.close()
 | |
|         return outerr
 | |
| 
 | |
| class StdCaptureFD(Capture): 
 | |
|     """ This class allows to capture writes to FD1 and FD2 
 | |
|         and may connect a NULL file to FD0 (and prevent
 | |
|         reads from sys.stdin)
 | |
|     """
 | |
|     def __init__(self, out=True, err=True, mixed=False, in_=True, patchsys=True): 
 | |
|         if in_:
 | |
|             self._oldin = (sys.stdin, os.dup(0))
 | |
|             sys.stdin  = DontReadFromInput()
 | |
|             fd = os.open(devnullpath, os.O_RDONLY)
 | |
|             os.dup2(fd, 0)
 | |
|             os.close(fd)
 | |
|         if out: 
 | |
|             self.out = py.io.FDCapture(1) 
 | |
|             if patchsys: 
 | |
|                 self.out.setasfile('stdout')
 | |
|         if err: 
 | |
|             if mixed and out:
 | |
|                 tmpfile = self.out.tmpfile 
 | |
|             else:
 | |
|                 tmpfile = None    
 | |
|             self.err = py.io.FDCapture(2, tmpfile=tmpfile) 
 | |
|             if patchsys: 
 | |
|                 self.err.setasfile('stderr')
 | |
| 
 | |
|     def done(self):
 | |
|         """ return (outfile, errfile) and stop capturing. """
 | |
|         if hasattr(self, 'out'): 
 | |
|             outfile = self.out.done() 
 | |
|         else:
 | |
|             outfile = StringIO()
 | |
|         if hasattr(self, 'err'): 
 | |
|             errfile = self.err.done() 
 | |
|         else:
 | |
|             errfile = StringIO()
 | |
|         if hasattr(self, '_oldin'):
 | |
|             oldsys, oldfd = self._oldin 
 | |
|             os.dup2(oldfd, 0)
 | |
|             os.close(oldfd)
 | |
|             sys.stdin = oldsys 
 | |
|         return outfile, errfile 
 | |
| 
 | |
| class StdCapture(Capture):
 | |
|     """ This class allows to capture writes to sys.stdout|stderr "in-memory"
 | |
|         and will raise errors on tries to read from sys.stdin. It only
 | |
|         modifies sys.stdout|stderr|stdin attributes and does not 
 | |
|         touch underlying File Descriptors (use StdCaptureFD for that). 
 | |
|     """
 | |
|     def __init__(self, out=True, err=True, in_=True, mixed=False):
 | |
|         self._out = out
 | |
|         self._err = err 
 | |
|         self._in = in_
 | |
|         if out: 
 | |
|             self.oldout = sys.stdout
 | |
|             sys.stdout = self.newout = StringIO()
 | |
|         if err: 
 | |
|             self.olderr = sys.stderr
 | |
|             if out and mixed: 
 | |
|                 newerr = self.newout 
 | |
|             else:
 | |
|                 newerr = StringIO()
 | |
|             sys.stderr = self.newerr = newerr
 | |
|         if in_:
 | |
|             self.oldin  = sys.stdin
 | |
|             sys.stdin  = self.newin  = DontReadFromInput()
 | |
| 
 | |
|     def done(self): 
 | |
|         """ return (outfile, errfile) and stop capturing. """
 | |
|         o,e = sys.stdout, sys.stderr
 | |
|         if self._out: 
 | |
|             try:
 | |
|                 sys.stdout = self.oldout 
 | |
|             except AttributeError:
 | |
|                 raise IOError("stdout capturing already reset")
 | |
|             del self.oldout
 | |
|             outfile = self.newout
 | |
|             outfile.seek(0)
 | |
|         else:
 | |
|             outfile = StringIO()
 | |
|         if self._err: 
 | |
|             try:
 | |
|                 sys.stderr = self.olderr 
 | |
|             except AttributeError:
 | |
|                 raise IOError("stderr capturing already reset")
 | |
|             del self.olderr 
 | |
|             errfile = self.newerr 
 | |
|             errfile.seek(0)
 | |
|         else:
 | |
|             errfile = StringIO()
 | |
|         if self._in:
 | |
|             sys.stdin = self.oldin 
 | |
|         return outfile, errfile
 | |
| 
 | |
| class DontReadFromInput:
 | |
|     """Temporary stub class.  Ideally when stdin is accessed, the
 | |
|     capturing should be turned off, with possibly all data captured
 | |
|     so far sent to the screen.  This should be configurable, though,
 | |
|     because in automated test runs it is better to crash than
 | |
|     hang indefinitely.
 | |
|     """
 | |
|     def read(self, *args):
 | |
|         raise IOError("reading from stdin while output is captured")
 | |
|     readline = read
 | |
|     readlines = read
 | |
|     __iter__ = read
 | |
| 
 | |
| try:
 | |
|     devnullpath = os.devnull
 | |
| except AttributeError:
 | |
|     if os.name == 'nt':
 | |
|         devnullpath = 'NUL'
 | |
|     else:
 | |
|         devnullpath = '/dev/null'
 | |
| 
 | |
| 
 |