[svn r63203] add a method that allows to send an item to multiple nodes
--HG-- branch : trunk
This commit is contained in:
parent
5f0cabb295
commit
3305a0e3db
|
@ -170,7 +170,6 @@ class DSession(Session):
|
||||||
l.remove(node)
|
l.remove(node)
|
||||||
if not l:
|
if not l:
|
||||||
del self.item2nodes[item]
|
del self.item2nodes[item]
|
||||||
|
|
||||||
return pending
|
return pending
|
||||||
|
|
||||||
def triggertesting(self, colitems):
|
def triggertesting(self, colitems):
|
||||||
|
@ -183,10 +182,29 @@ class DSession(Session):
|
||||||
self.bus.notify("collectionstart", event.CollectionStart(next))
|
self.bus.notify("collectionstart", event.CollectionStart(next))
|
||||||
self.queueevent("collectionreport", basic_collect_report(next))
|
self.queueevent("collectionreport", basic_collect_report(next))
|
||||||
self.senditems(senditems)
|
self.senditems(senditems)
|
||||||
|
#self.senditems_each(senditems)
|
||||||
|
|
||||||
def queueevent(self, eventname, *args, **kwargs):
|
def queueevent(self, eventname, *args, **kwargs):
|
||||||
self.queue.put((eventname, args, kwargs))
|
self.queue.put((eventname, args, kwargs))
|
||||||
|
|
||||||
|
def senditems_each(self, tosend):
|
||||||
|
if not tosend:
|
||||||
|
return
|
||||||
|
room = self.MAXITEMSPERHOST
|
||||||
|
for node, pending in self.node2pending.items():
|
||||||
|
room = min(self.MAXITEMSPERHOST - len(pending), room)
|
||||||
|
sending = tosend[:room]
|
||||||
|
for node, pending in self.node2pending.items():
|
||||||
|
node.sendlist(sending)
|
||||||
|
pending.extend(sending)
|
||||||
|
for item in sending:
|
||||||
|
self.item2nodes.setdefault(item, []).append(node)
|
||||||
|
self.bus.notify("itemstart", item, node)
|
||||||
|
tosend[:] = tosend[room:] # update inplace
|
||||||
|
if tosend:
|
||||||
|
# we have some left, give it to the main loop
|
||||||
|
self.queueevent("rescheduleitems", event.RescheduleItems(tosend))
|
||||||
|
|
||||||
def senditems(self, tosend):
|
def senditems(self, tosend):
|
||||||
if not tosend:
|
if not tosend:
|
||||||
return
|
return
|
||||||
|
@ -221,6 +239,7 @@ class DSession(Session):
|
||||||
def handle_crashitem(self, item, node):
|
def handle_crashitem(self, item, node):
|
||||||
longrepr = "!!! Node %r crashed during running of test %r" %(node, item)
|
longrepr = "!!! Node %r crashed during running of test %r" %(node, item)
|
||||||
rep = event.ItemTestReport(item, when="???", excinfo=longrepr)
|
rep = event.ItemTestReport(item, when="???", excinfo=longrepr)
|
||||||
|
rep.node = node
|
||||||
self.bus.notify("itemtestreport", rep)
|
self.bus.notify("itemtestreport", rep)
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
|
|
|
@ -42,6 +42,24 @@ class TestDSession:
|
||||||
l = session.removenode(node)
|
l = session.removenode(node)
|
||||||
assert not l
|
assert not l
|
||||||
|
|
||||||
|
def test_send_remove_to_two_nodes(self, testdir):
|
||||||
|
item = testdir.getitem("def test_func(): pass")
|
||||||
|
node1 = MockNode()
|
||||||
|
node2 = MockNode()
|
||||||
|
session = DSession(item.config)
|
||||||
|
session.addnode(node1)
|
||||||
|
session.addnode(node2)
|
||||||
|
session.senditems_each([item])
|
||||||
|
assert session.node2pending[node1] == [item]
|
||||||
|
assert session.node2pending[node2] == [item]
|
||||||
|
assert node1 in session.item2nodes[item]
|
||||||
|
assert node2 in session.item2nodes[item]
|
||||||
|
session.removeitem(item, node1)
|
||||||
|
assert session.item2nodes[item] == [node2]
|
||||||
|
session.removeitem(item, node2)
|
||||||
|
assert not session.node2pending[node1]
|
||||||
|
assert not session.item2nodes
|
||||||
|
|
||||||
def test_senditems_removeitems(self, testdir):
|
def test_senditems_removeitems(self, testdir):
|
||||||
item = testdir.getitem("def test_func(): pass")
|
item = testdir.getitem("def test_func(): pass")
|
||||||
node = MockNode()
|
node = MockNode()
|
||||||
|
|
Loading…
Reference in New Issue