88 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			88 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
	
| """
 | |
| Node code for Master. 
 | |
| """
 | |
| import py
 | |
| from py.__.test.rsession.outcome import ReprOutcome
 | |
| from py.__.test.rsession import report
 | |
| 
 | |
| class MasterNode(object):
 | |
|     def __init__(self, channel, reporter, done_dict):
 | |
|         self.channel = channel
 | |
|         self.reporter = reporter
 | |
|         
 | |
|         def callback(outcome):
 | |
|             item = self.pending.pop()
 | |
|             if not item in done_dict:
 | |
|                 self.receive_result(outcome, item)
 | |
|                 done_dict[item] = True
 | |
|         channel.setcallback(callback)
 | |
|         self.pending = []
 | |
| 
 | |
|     def receive_result(self, outcomestring, item):
 | |
|         repr_outcome = ReprOutcome(outcomestring)
 | |
|         # send finish report
 | |
|         self.reporter(report.ReceivedItemOutcome(
 | |
|                         self.channel, item, repr_outcome))
 | |
| 
 | |
|     def send(self, item):
 | |
|         if item is StopIteration:
 | |
|             self.channel.send(42)
 | |
|         else:
 | |
|             self.pending.insert(0, item)
 | |
|             #itemspec = item.listnames()[1:]
 | |
|             self.channel.send(item.get_collector_trail())
 | |
|             # send start report
 | |
|             self.reporter(report.SendItem(self.channel, item))
 | |
| 
 | |
| def itemgen(colitems, reporter, keyword, reporterror):
 | |
|     for x in colitems:
 | |
|         for y in x.tryiter(reporterror = lambda x: reporterror(reporter, x), keyword = keyword):
 | |
|             yield y
 | |
| 
 | |
| def randomgen(items, done_dict):
 | |
|     """ Generator, which randomly gets all the tests from items as long
 | |
|     as they're not in done_dict
 | |
|     """
 | |
|     import random
 | |
|     while items:
 | |
|         values = items.keys()
 | |
|         item = values[int(random.random()*len(values))]
 | |
|         if item in done_dict:
 | |
|             del items[item]
 | |
|         else:
 | |
|             yield item
 | |
| 
 | |
| def dispatch_loop(masternodes, itemgenerator, shouldstop, 
 | |
|                   waiter = lambda: py.std.time.sleep(0.1),
 | |
|                   max_tasks_per_node=None):
 | |
|     if not max_tasks_per_node:
 | |
|         max_tasks_per_node = py.test.config.getvalue("dist_taskspernode")
 | |
|     all_tests = {}
 | |
|     while 1:
 | |
|         try:
 | |
|             for node in masternodes:
 | |
|                 if len(node.pending) < max_tasks_per_node:
 | |
|                     item = itemgenerator.next()
 | |
|                     all_tests[item] = True
 | |
|                     if shouldstop():
 | |
|                         for _node in masternodes:
 | |
|                             _node.send(StopIteration) # magic connector
 | |
|                         return None
 | |
|                     node.send(item)
 | |
|         except StopIteration:
 | |
|             break
 | |
|         waiter()
 | |
|     return all_tests
 | |
| 
 | |
| def setup_slave(gateway, pkgpath, config):
 | |
|     from py.__.test.rsession import slave
 | |
|     import os
 | |
|     ch = gateway.remote_exec(str(py.code.Source(slave.setup, "setup()")))
 | |
|     #if hasattr(gateway, 'sshaddress'):
 | |
|     #    assert not os.path.isabs(pkgpath)
 | |
|     ch.send(str(pkgpath))
 | |
|     ch.send(config.make_repr(defaultconftestnames))
 | |
|     return ch
 | |
| 
 | |
| defaultconftestnames = ['dist_nicelevel']
 |