71 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			71 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
"""
 | 
						|
Node code for Master. 
 | 
						|
"""
 | 
						|
import py
 | 
						|
from py.__.test.rsession.outcome import ReprOutcome
 | 
						|
from py.__.test.rsession import repevent
 | 
						|
 | 
						|
class MasterNode(object):
 | 
						|
    def __init__(self, channel, reporter):
 | 
						|
        self.channel = channel
 | 
						|
        self.reporter = reporter
 | 
						|
        self.pending = []
 | 
						|
        channel.setcallback(self._callback)
 | 
						|
       
 | 
						|
    def _callback(self, outcome):
 | 
						|
        item = self.pending.pop()
 | 
						|
        self.receive_result(outcome, item)
 | 
						|
 | 
						|
    def receive_result(self, outcomestring, item):
 | 
						|
        repr_outcome = ReprOutcome(outcomestring)
 | 
						|
        # send finish report
 | 
						|
        self.reporter(repevent.ReceivedItemOutcome(
 | 
						|
                        self.channel, item, repr_outcome))
 | 
						|
 | 
						|
    def send(self, item):
 | 
						|
        try:
 | 
						|
            if item is StopIteration:
 | 
						|
                self.channel.send(42)
 | 
						|
            else:
 | 
						|
                self.pending.insert(0, item)
 | 
						|
            #itemspec = item.listnames()[1:]
 | 
						|
                self.channel.send(item._get_collector_trail())
 | 
						|
                # send start report
 | 
						|
                self.reporter(repevent.SendItem(self.channel, item))
 | 
						|
        except IOError:
 | 
						|
            print "Sending error, channel IOError"
 | 
						|
            print self.channel._getremoteerror()
 | 
						|
            # XXX: this should go as soon as we'll have proper detection
 | 
						|
            #      of hanging nodes and such
 | 
						|
            raise
 | 
						|
 | 
						|
def itemgen(colitems, reporter, keyword, reporterror):
 | 
						|
    def rep(x):
 | 
						|
        reporterror(reporter, x)
 | 
						|
    for x in colitems:
 | 
						|
        for y in x._tryiter(reporterror=rep, keyword=keyword):
 | 
						|
            yield y
 | 
						|
 | 
						|
def dispatch_loop(masternodes, itemgenerator, shouldstop, 
 | 
						|
                  waiter = lambda: py.std.time.sleep(0.1),
 | 
						|
                  max_tasks_per_node=None):
 | 
						|
    if not max_tasks_per_node:
 | 
						|
        max_tasks_per_node = py.test.config.getvalue("dist_taskspernode")
 | 
						|
    all_tests = {}
 | 
						|
    while 1:
 | 
						|
        try:
 | 
						|
            for node in masternodes:
 | 
						|
                if len(node.pending) < max_tasks_per_node:
 | 
						|
                    item = itemgenerator.next()
 | 
						|
                    all_tests[item] = True
 | 
						|
                    if shouldstop():
 | 
						|
                        for _node in masternodes:
 | 
						|
                            _node.send(StopIteration) # magic connector
 | 
						|
                        return None
 | 
						|
                    node.send(item)
 | 
						|
        except StopIteration:
 | 
						|
            break
 | 
						|
        waiter()
 | 
						|
    return all_tests
 | 
						|
 |