diff --git a/py/__init__.py b/py/__init__.py index 4ed60afb9..4b5f1d528 100644 --- a/py/__init__.py +++ b/py/__init__.py @@ -27,8 +27,8 @@ version = "1.0.0a1" initpkg(__name__, description = "pylib and py.test: agile development and test support library", - revision = int('$LastChangedRevision: 63027 $'.split(':')[1][:-1]), - lastchangedate = '$LastChangedDate: 2009-03-18 12:18:39 +0100 (Wed, 18 Mar 2009) $', + revision = int('$LastChangedRevision: 63089 $'.split(':')[1][:-1]), + lastchangedate = '$LastChangedDate: 2009-03-19 18:05:41 +0100 (Thu, 19 Mar 2009) $', version = version, url = "http://pylib.org", download_url = "http://codespeak.net/py/0.9.2/download.html", @@ -152,8 +152,8 @@ initpkg(__name__, 'execnet.PopenGateway' : ('./execnet/register.py', 'PopenGateway'), 'execnet.SshGateway' : ('./execnet/register.py', 'SshGateway'), 'execnet.GatewaySpec' : ('./execnet/gwmanage.py', 'GatewaySpec'), - 'execnet.MultiGateway' : ('./execnet/gwmanage.py', 'MultiGateway'), - 'execnet.MultiChannel' : ('./execnet/gwmanage.py', 'MultiChannel'), + 'execnet.MultiGateway' : ('./execnet/multi.py', 'MultiGateway'), + 'execnet.MultiChannel' : ('./execnet/multi.py', 'MultiChannel'), # execnet scripts 'execnet.RSync' : ('./execnet/rsync.py', 'RSync'), diff --git a/py/execnet/gwmanage.py b/py/execnet/gwmanage.py index ed8614333..cb446aa8b 100644 --- a/py/execnet/gwmanage.py +++ b/py/execnet/gwmanage.py @@ -94,64 +94,6 @@ class GatewaySpec(object): gw.spec = self return gw -class MultiChannel: - def __init__(self, channels): - self._channels = channels - - def send_each(self, item): - for ch in self._channels: - ch.send(item) - - def receive_each(self, withchannel=False): - assert not hasattr(self, '_queue') - l = [] - for ch in self._channels: - obj = ch.receive() - if withchannel: - l.append((ch, obj)) - else: - l.append(obj) - return l - - def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED): - try: - return self._queue - except AttributeError: - self._queue = py.std.Queue.Queue() - for ch in self._channels: - def putreceived(obj, channel=ch): - self._queue.put((channel, obj)) - if endmarker is NO_ENDMARKER_WANTED: - ch.setcallback(putreceived) - else: - ch.setcallback(putreceived, endmarker=endmarker) - return self._queue - - - def waitclose(self): - first = None - for ch in self._channels: - try: - ch.waitclose() - except ch.RemoteError: - if first is None: - first = py.std.sys.exc_info() - if first: - raise first[0], first[1], first[2] - -class MultiGateway: - RemoteError = RemoteError - def __init__(self, gateways): - self.gateways = gateways - def remote_exec(self, source): - channels = [] - for gw in self.gateways: - channels.append(gw.remote_exec(source)) - return MultiChannel(channels) - def exit(self): - for gw in self.gateways: - gw.exit() - class GatewayManager: RemoteError = RemoteError @@ -179,7 +121,7 @@ class GatewayManager: else: if remote: l.append(gw) - return MultiGateway(gateways=l) + return py.execnet.MultiGateway(gateways=l) def multi_exec(self, source, inplacelocal=True): """ remote execute code on all gateways. diff --git a/py/execnet/multi.py b/py/execnet/multi.py new file mode 100644 index 000000000..f08a02db3 --- /dev/null +++ b/py/execnet/multi.py @@ -0,0 +1,68 @@ +""" + Working with multiple channels and gateways + +""" +import py +from py.__.execnet.channel import RemoteError + +NO_ENDMARKER_WANTED = object() + +class MultiGateway: + RemoteError = RemoteError + def __init__(self, gateways): + self.gateways = gateways + def remote_exec(self, source): + channels = [] + for gw in self.gateways: + channels.append(gw.remote_exec(source)) + return MultiChannel(channels) + def exit(self): + for gw in self.gateways: + gw.exit() + +class MultiChannel: + def __init__(self, channels): + self._channels = channels + + def send_each(self, item): + for ch in self._channels: + ch.send(item) + + def receive_each(self, withchannel=False): + assert not hasattr(self, '_queue') + l = [] + for ch in self._channels: + obj = ch.receive() + if withchannel: + l.append((ch, obj)) + else: + l.append(obj) + return l + + def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED): + try: + return self._queue + except AttributeError: + self._queue = py.std.Queue.Queue() + for ch in self._channels: + def putreceived(obj, channel=ch): + self._queue.put((channel, obj)) + if endmarker is NO_ENDMARKER_WANTED: + ch.setcallback(putreceived) + else: + ch.setcallback(putreceived, endmarker=endmarker) + return self._queue + + + def waitclose(self): + first = None + for ch in self._channels: + try: + ch.waitclose() + except ch.RemoteError: + if first is None: + first = py.std.sys.exc_info() + if first: + raise first[0], first[1], first[2] + +