From 764b85f23edcd9e449e1338e6406e4ba73a5fc6c Mon Sep 17 00:00:00 2001 From: hpk Date: Fri, 20 Mar 2009 14:31:02 +0100 Subject: [PATCH] [svn r63136] better grouping of gateway public API --HG-- branch : trunk --- py/execnet/gateway.py | 92 ++++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index 2e3b2d08b..db941fc87 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -202,28 +202,6 @@ class Gateway(object): if joining: self.join() - def remote_init_threads(self, num=None): - """ start up to 'num' threads for subsequent - remote_exec() invocations to allow concurrent - execution. - """ - if hasattr(self, '_remotechannelthread'): - raise IOError("remote threads already running") - from py.__.thread import pool - source = py.code.Source(pool, """ - execpool = WorkerPool(maxthreads=%r) - gw = channel.gateway - while 1: - task = gw._requestqueue.get() - if task is None: - gw._stopsend() - execpool.shutdown() - execpool.join() - raise gw._StopExecLoop - execpool.dispatch(gw._executetask, task) - """ % num) - self._remotechannelthread = self.remote_exec(source) - def _executetask(self, item): """ execute channel/source items. """ from sys import exc_info @@ -268,10 +246,6 @@ class Gateway(object): # High Level Interface # _____________________________________________________________________ # - def newchannel(self): - """ return new channel object. """ - return self._channelfactory.new() - def remote_exec(self, source, stdout=None, stderr=None): """ return channel object and connect it to a remote execution thread where the given 'source' executes @@ -295,6 +269,53 @@ class Gateway(object): channel.id, (source, outid, errid))) return channel + def remote_init_threads(self, num=None): + """ start up to 'num' threads for subsequent + remote_exec() invocations to allow concurrent + execution. + """ + if hasattr(self, '_remotechannelthread'): + raise IOError("remote threads already running") + from py.__.thread import pool + source = py.code.Source(pool, """ + execpool = WorkerPool(maxthreads=%r) + gw = channel.gateway + while 1: + task = gw._requestqueue.get() + if task is None: + gw._stopsend() + execpool.shutdown() + execpool.join() + raise gw._StopExecLoop + execpool.dispatch(gw._executetask, task) + """ % num) + self._remotechannelthread = self.remote_exec(source) + + def newchannel(self): + """ return new channel object. """ + return self._channelfactory.new() + + def join(self, joinexec=True): + """ Wait for all IO (and by default all execution activity) + to stop. the joinexec parameter is obsolete. + """ + current = threading.currentThread() + if self._receiverthread.isAlive(): + self._trace("joining receiver thread") + self._receiverthread.join() + + def exit(self): + """ Try to stop all exec and IO activity. """ + self._cleanup.unregister(self) + self._stopexec() + self._stopsend() + try: + py._com.pyplugins.notify("gateway_exit", self) + except NameError: + # XXX on the remote side 'py' is not imported + # and so we can't notify + pass + def _remote_redirect(self, stdout=None, stderr=None): """ return a handle representing a redirection of a remote end's stdout to a local file object. with handle.close() @@ -325,17 +346,6 @@ class Gateway(object): c.waitclose() return Handle() - def exit(self): - """ Try to stop all exec and IO activity. """ - self._cleanup.unregister(self) - self._stopexec() - self._stopsend() - try: - py._com.pyplugins.notify("gateway_exit", self) - except NameError: - # on the remote side 'py' is not imported - # and so we can't notify (XXX: make execnet synchronous) - pass def _stopsend(self): self._send(None) @@ -344,14 +354,6 @@ class Gateway(object): if self._requestqueue is not None: self._requestqueue.put(None) - def join(self, joinexec=True): - """ Wait for all IO (and by default all execution activity) - to stop. the joinexec parameter is obsolete. - """ - current = threading.currentThread() - if self._receiverthread.isAlive(): - self._trace("joining receiver thread") - self._receiverthread.join() def getid(gw, cache={}): name = gw.__class__.__name__