76 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			76 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Python
		
	
	
	
 | 
						|
import os, sys
 | 
						|
import py
 | 
						|
 | 
						|
class FDCapture: 
 | 
						|
    """ Capture IO to/from a given os-level filedescriptor. """
 | 
						|
    def __init__(self, targetfd, tmpfile=None): 
 | 
						|
        self.targetfd = targetfd
 | 
						|
        if tmpfile is None: 
 | 
						|
            tmpfile = self.maketmpfile()
 | 
						|
        self.tmpfile = tmpfile 
 | 
						|
        self._savefd = os.dup(targetfd)
 | 
						|
        os.dup2(self.tmpfile.fileno(), targetfd) 
 | 
						|
        self._patched = []
 | 
						|
 | 
						|
    def setasfile(self, name, module=sys): 
 | 
						|
        key = (module, name)
 | 
						|
        self._patched.append((key, getattr(module, name)))
 | 
						|
        setattr(module, name, self.tmpfile) 
 | 
						|
 | 
						|
    def unsetfiles(self): 
 | 
						|
        while self._patched: 
 | 
						|
            (module, name), value = self._patched.pop()
 | 
						|
            setattr(module, name, value) 
 | 
						|
 | 
						|
    def done(self): 
 | 
						|
        os.dup2(self._savefd, self.targetfd) 
 | 
						|
        self.unsetfiles() 
 | 
						|
        os.close(self._savefd) 
 | 
						|
        self.tmpfile.seek(0)
 | 
						|
        return self.tmpfile 
 | 
						|
 | 
						|
    def maketmpfile(self): 
 | 
						|
        f = os.tmpfile()
 | 
						|
        newf = py.io.dupfile(f) 
 | 
						|
        f.close()
 | 
						|
        return newf 
 | 
						|
 | 
						|
class OutErrCapture: 
 | 
						|
    """ capture Stdout and Stderr both on filedescriptor 
 | 
						|
        and sys.stdout/stderr level. 
 | 
						|
    """
 | 
						|
    def __init__(self, out=True, err=True, patchsys=True): 
 | 
						|
        if out: 
 | 
						|
            self.out = FDCapture(1) 
 | 
						|
            if patchsys: 
 | 
						|
                self.out.setasfile('stdout')
 | 
						|
        if err: 
 | 
						|
            self.err = FDCapture(2) 
 | 
						|
            if patchsys: 
 | 
						|
                self.err.setasfile('stderr')
 | 
						|
 | 
						|
    def reset(self): 
 | 
						|
        out = err = ""
 | 
						|
        if hasattr(self, 'out'): 
 | 
						|
            outfile = self.out.done() 
 | 
						|
            out = outfile.read()
 | 
						|
        if hasattr(self, 'err'): 
 | 
						|
            errfile = self.err.done() 
 | 
						|
            err = errfile.read()
 | 
						|
        return out, err 
 | 
						|
 | 
						|
def callcapture(func, *args, **kwargs): 
 | 
						|
    """ call the given function with args/kwargs
 | 
						|
        and return a (res, out, err) tuple where
 | 
						|
        out and err represent the output/error output
 | 
						|
        during function execution. 
 | 
						|
    """ 
 | 
						|
    so = OutErrCapture()
 | 
						|
    try: 
 | 
						|
        res = func(*args, **kwargs)
 | 
						|
    finally: 
 | 
						|
        out, err = so.reset()
 | 
						|
    return res, out, err 
 | 
						|
 |