From c3e5ca560ae199d4860d367624d951e2c308f948 Mon Sep 17 00:00:00 2001 From: hpk Date: Tue, 17 Mar 2009 23:41:56 +0100 Subject: [PATCH] [svn r63014] streamlining multichannel interface, fixing test work with -n 3 --HG-- branch : trunk --- py/execnet/gwmanage.py | 49 ++++++++++++++++--- py/execnet/testing/test_gwmanage.py | 54 ++++++++++++++------- py/test/dsession/hostmanage.py | 2 +- py/test/dsession/testing/test_hostmanage.py | 2 +- 4 files changed, 80 insertions(+), 27 deletions(-) diff --git a/py/execnet/gwmanage.py b/py/execnet/gwmanage.py index e95d5023f..bd2d4bb9d 100644 --- a/py/execnet/gwmanage.py +++ b/py/execnet/gwmanage.py @@ -19,7 +19,9 @@ import py import sys, os from py.__.test.dsession.masterslave import MasterNode from py.__.test import event +from py.__.execnet.channel import RemoteError +NO_ENDMARKER_WANTED = object() class GatewaySpec(object): def __init__(self, spec, defaultjoinpath="pyexecnetcache"): @@ -81,20 +83,49 @@ class MultiChannel: def __init__(self, channels): self._channels = channels - def receive_items(self): - items = [] + def send_each(self, item): for ch in self._channels: - items.append((ch, ch.receive())) - return items + ch.send(item) + + def receive_each(self, withchannel=False): + assert not hasattr(self, '_queue') + l = [] + for ch in self._channels: + obj = ch.receive() + if withchannel: + l.append((ch, obj)) + else: + l.append(obj) + return l + + def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED): + try: + return self._queue + except AttributeError: + self._queue = py.std.Queue.Queue() + for ch in self._channels: + def putreceived(obj, channel=ch): + self._queue.put((channel, obj)) + if endmarker is NO_ENDMARKER_WANTED: + ch.setcallback(putreceived) + else: + ch.setcallback(putreceived, endmarker=endmarker) + return self._queue - def receive(self): - return [x[1] for x in self.receive_items()] def waitclose(self): + first = None for ch in self._channels: - ch.waitclose() + try: + ch.waitclose() + except ch.RemoteError: + if first is None: + first = py.std.sys.exc_info() + if first: + raise first[0], first[1], first[2] class MultiGateway: + RemoteError = RemoteError def __init__(self, gateways): self.gateways = gateways def remote_exec(self, source): @@ -104,6 +135,8 @@ class MultiGateway: return MultiChannel(channels) class GatewayManager: + RemoteError = RemoteError + def __init__(self, specs): self.specs = [GatewaySpec(spec) for spec in specs] self.gateways = [] @@ -118,6 +151,8 @@ class GatewayManager: self.gateways.append(spec.makegateway()) def getgateways(self, remote=True, inplacelocal=True): + if not self.gateways and self.specs: + self.makegateways() l = [] for gw in self.gateways: if gw.spec.inplacelocal(): diff --git a/py/execnet/testing/test_gwmanage.py b/py/execnet/testing/test_gwmanage.py index 84f9fec13..044949b35 100644 --- a/py/execnet/testing/test_gwmanage.py +++ b/py/execnet/testing/test_gwmanage.py @@ -1,7 +1,8 @@ """ tests for - - host specifications - - managing hosts + - gateway specifications + - multi channels and multi gateways + - gateway management - manage rsyncing of hosts """ @@ -25,10 +26,6 @@ class TestGatewaySpec: assert spec.type == "popen" spec2 = GatewaySpec("popen" + joinpath) self._equality(spec, spec2) - if joinpath == "": - assert spec.inplacelocal() - else: - assert not spec.inplacelocal() def test_ssh(self): for prefix in ('ssh:', ''): # ssh is default @@ -44,7 +41,6 @@ class TestGatewaySpec: assert spec.type == "ssh" spec2 = GatewaySpec(specstring) self._equality(spec, spec2) - assert not spec.inplacelocal() def test_socket(self): for hostpart in ('x.y', 'x', 'popen'): @@ -59,7 +55,6 @@ class TestGatewaySpec: assert spec.type == "socket" spec2 = GatewaySpec("socket:" + hostpart + port + joinpath) self._equality(spec, spec2) - assert not spec.inplacelocal() def _equality(self, spec1, spec2): assert spec1 != spec2 @@ -155,16 +150,13 @@ class TestGatewayManagerPopen: testdir.tmpdir.chdir() hellopath = testdir.tmpdir.mkdir("hello") hm.makegateways() - l = [x[1] for x in hm.multi_exec( - "import os ; channel.send(os.getcwd())" - ).receive_items() - ] + l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each() paths = [x[1] for x in l] assert l == [str(hellopath)] * 2 - py.test.raises(Exception, 'hm.multi_chdir("world", inplacelocal=False)') + py.test.raises(hm.RemoteError, 'hm.multi_chdir("world", inplacelocal=False)') worldpath = hellopath.mkdir("world") hm.multi_chdir("world", inplacelocal=False) - l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive() + l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each() assert len(l) == 2 assert l[0] == l[1] curwd = os.getcwd() @@ -178,12 +170,12 @@ class TestGatewayManagerPopen: hellopath = testdir.tmpdir.mkdir("hello") hm.makegateways() hm.multi_chdir("hello", inplacelocal=False) - l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive() + l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each() assert len(l) == 2 assert l == [os.getcwd()] * 2 hm.multi_chdir("hello") - l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive() + l = hm.multi_exec("import os ; channel.send(os.getcwd())").receive_each() assert len(l) == 2 assert l[0] == l[1] curwd = os.getcwd() @@ -192,7 +184,7 @@ class TestGatewayManagerPopen: from py.__.execnet.gwmanage import MultiChannel class TestMultiChannel: - def test_multichannel_receive_items(self): + def test_multichannel_receive_each(self): class pseudochannel: def receive(self): return 12 @@ -200,9 +192,35 @@ class TestMultiChannel: pc1 = pseudochannel() pc2 = pseudochannel() multichannel = MultiChannel([pc1, pc2]) - l = multichannel.receive_items() + l = multichannel.receive_each(withchannel=True) assert len(l) == 2 assert l == [(pc1, 12), (pc2, 12)] + l = multichannel.receive_each(withchannel=False) + assert l == [12,12] + + def test_multichannel_send_each(self): + gm = GatewayManager(['popen'] * 2) + mc = gm.multi_exec(""" + import os + channel.send(channel.receive() + 1) + """) + mc.send_each(41) + l = mc.receive_each() + assert l == [42,42] + + def test_multichannel_receive_queue(self): + gm = GatewayManager(['popen'] * 2) + mc = gm.multi_exec(""" + import os + channel.send(os.getpid()) + """) + queue = mc.make_receive_queue() + ch, item = queue.get(timeout=0.5) + ch2, item2 = queue.get(timeout=0.5) + assert ch != ch2 + assert ch.gateway != ch2.gateway + assert item != item2 + mc.waitclose() def test_multichannel_waitclose(self): l = [] diff --git a/py/test/dsession/hostmanage.py b/py/test/dsession/hostmanage.py index 88fdfef36..db5124e0a 100644 --- a/py/test/dsession/hostmanage.py +++ b/py/test/dsession/hostmanage.py @@ -51,7 +51,7 @@ class HostManager(object): for ch, result in self.gwmanager.multi_exec(""" import sys, os channel.send((sys.executable, os.getcwd(), sys.path)) - """).receive_items(): + """).receive_each(withchannel=True): self.trace("spec %r, execuable %r, cwd %r, syspath %r" %( ch.gateway.spec, result[0], result[1], result[2])) diff --git a/py/test/dsession/testing/test_hostmanage.py b/py/test/dsession/testing/test_hostmanage.py index b79493d22..7b5dcd2a8 100644 --- a/py/test/dsession/testing/test_hostmanage.py +++ b/py/test/dsession/testing/test_hostmanage.py @@ -39,7 +39,7 @@ class TestHostManager: hm = HostManager(config, hosts=["popen:%s" % dest]) assert hm.config.topdir == source == config.topdir hm.rsync_roots() - p, = hm.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive() + p, = hm.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive_each() p = py.path.local(p) print "remote curdir", p assert p == dest.join(config.topdir.basename)