148 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			148 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
"""
 | 
						|
Node code for slaves. 
 | 
						|
"""
 | 
						|
 | 
						|
import py
 | 
						|
from py.__.test.rsession.executor import RunExecutor, BoxExecutor, AsyncExecutor
 | 
						|
from py.__.test.rsession.outcome import Outcome
 | 
						|
from py.__.test.outcome import Skipped
 | 
						|
import thread
 | 
						|
import os
 | 
						|
 | 
						|
class PidInfo(object):
 | 
						|
    """ Pure container class to store information of actually running
 | 
						|
    pid
 | 
						|
    """
 | 
						|
    def __init__(self):
 | 
						|
        self.pid = 0
 | 
						|
        self.lock = thread.allocate_lock()
 | 
						|
 | 
						|
    def set_pid(self, pid):
 | 
						|
        self.lock.acquire()
 | 
						|
        try:
 | 
						|
            self.pid = pid
 | 
						|
        finally:
 | 
						|
            self.lock.release()
 | 
						|
 | 
						|
    def kill(self):
 | 
						|
        self.lock.acquire()
 | 
						|
        try:
 | 
						|
            if self.pid:
 | 
						|
                os.kill(self.pid, 15)
 | 
						|
                self.pid = 0
 | 
						|
        finally:
 | 
						|
            self.lock.release()
 | 
						|
 | 
						|
    def waitandclear(self, pid, num):
 | 
						|
        """ This is an obscure hack to keep locking properly, adhere to posix semantics
 | 
						|
        and try to clean it as much as possible, not clean at all
 | 
						|
        """
 | 
						|
        self.lock.acquire()
 | 
						|
        try:
 | 
						|
            retval = os.waitpid(self.pid, 0)
 | 
						|
            self.pid = 0
 | 
						|
            return retval
 | 
						|
        finally:
 | 
						|
            self.lock.release()
 | 
						|
 | 
						|
class SlaveNode(object):
 | 
						|
    def __init__(self, config, pidinfo, executor=AsyncExecutor):
 | 
						|
        #self.rootcollector = rootcollector
 | 
						|
        self.config = config
 | 
						|
        self.executor = executor
 | 
						|
        self.pidinfo = pidinfo
 | 
						|
 | 
						|
    def execute(self, itemspec):
 | 
						|
        item = self.config._getcollector(itemspec)
 | 
						|
        ex = self.executor(item, config=self.config)
 | 
						|
        if self.executor is AsyncExecutor:
 | 
						|
            cont, pid = ex.execute()
 | 
						|
            self.pidinfo.set_pid(pid)
 | 
						|
        else:
 | 
						|
            # for tests only
 | 
						|
            return ex.execute()
 | 
						|
        return cont(self.pidinfo.waitandclear)
 | 
						|
 | 
						|
    def run(self, itemspec):
 | 
						|
        #outcome = self.execute(itemspec)
 | 
						|
        #return outcome.make_repr()
 | 
						|
        outcome = self.execute(itemspec)
 | 
						|
        if self.executor.wraps:
 | 
						|
            return outcome
 | 
						|
        else:
 | 
						|
            return outcome.make_repr(self.config.option.tbstyle)
 | 
						|
 | 
						|
def slave_main(receive, send, path, config, pidinfo):
 | 
						|
    import os
 | 
						|
    assert os.path.exists(path) 
 | 
						|
    path = os.path.abspath(path) 
 | 
						|
    nodes = {}
 | 
						|
    def getnode(item):
 | 
						|
        node = nodes.get(item[0], None)
 | 
						|
        if node is not None:
 | 
						|
            return node
 | 
						|
        col = py.test.collect.Directory(str(py.path.local(path).join(item[0])))
 | 
						|
        node = nodes[item[0]] = SlaveNode(config, pidinfo)
 | 
						|
        return node
 | 
						|
    while 1:
 | 
						|
        nextitem = receive()
 | 
						|
        if nextitem is None:
 | 
						|
            break
 | 
						|
        try:
 | 
						|
            node = getnode(nextitem)
 | 
						|
            res = node.run(nextitem)
 | 
						|
        except Skipped, s:
 | 
						|
            send(Outcome(skipped=str(s)).make_repr())
 | 
						|
        except:
 | 
						|
            excinfo = py.code.ExceptionInfo()
 | 
						|
            send(Outcome(excinfo=excinfo, is_critical=True).make_repr())
 | 
						|
        else:
 | 
						|
            if not res[0] and not res[3] and config.option.exitfirst:
 | 
						|
                # we're finished, but need to eat what we can
 | 
						|
                send(res)
 | 
						|
                break
 | 
						|
            send(res)
 | 
						|
    
 | 
						|
    while nextitem is not None:
 | 
						|
        nextitem = receive()
 | 
						|
 | 
						|
defaultconftestnames = ['dist_nicelevel']
 | 
						|
def setup_slave(host, config): 
 | 
						|
    channel = host.gw.remote_exec(str(py.code.Source(setup, "setup()")))
 | 
						|
    configrepr = config.make_repr(defaultconftestnames)
 | 
						|
    #print "sending configrepr", configrepr
 | 
						|
    channel.send(host.gw_remotepath)
 | 
						|
    channel.send(configrepr) 
 | 
						|
    return channel
 | 
						|
 | 
						|
def setup():
 | 
						|
    def callback_gen(channel, queue, info):
 | 
						|
        def callback(item):
 | 
						|
            if item == 42: # magic call-cleanup
 | 
						|
                # XXX should kill a pid here
 | 
						|
                info.kill()
 | 
						|
                channel.close()
 | 
						|
                sys.exit(0)
 | 
						|
            queue.put(item)
 | 
						|
        return callback
 | 
						|
    # our current dir is the topdir
 | 
						|
    import os, sys
 | 
						|
    basedir = channel.receive()
 | 
						|
    config_repr = channel.receive()
 | 
						|
    # setup defaults...
 | 
						|
    sys.path.insert(0, basedir)
 | 
						|
    import py
 | 
						|
    config = py.test.config
 | 
						|
    assert not config._initialized 
 | 
						|
    config.initdirect(basedir, config_repr)
 | 
						|
    if not config.option.nomagic:
 | 
						|
        py.magic.invoke(assertion=1)
 | 
						|
    from py.__.test.rsession.slave import slave_main, PidInfo
 | 
						|
    queue = py.std.Queue.Queue()
 | 
						|
    pidinfo = PidInfo()
 | 
						|
    channel.setcallback(callback_gen(channel, queue, pidinfo))
 | 
						|
    slave_main(queue.get, channel.send, basedir, config, pidinfo)
 | 
						|
    if not config.option.nomagic:
 | 
						|
        py.magic.revoke(assertion=1)
 | 
						|
    channel.close()
 |