[svn r45539] merge the execnet lessthreads branch (using the branch'es history):

* now by default Gateways DO NOT SPAWN execution threads
  you can call "remote_init_threads(NUM)" on an already instantiated
  gateway, which will install a loop on the other side which will
  dispatch each execution task to its own thread.

* execution is dissallowed on the side which initiates a gateway
  (rarely used, anyway)

* some cleanups (hopefully)

--HG--
branch : trunk
This commit is contained in:
hpk 2007-08-07 19:34:59 +02:00
parent f80336f076
commit a5e69d2035
4 changed files with 152 additions and 79 deletions

View File

@ -22,33 +22,38 @@ if 'ThreadOut' not in globals():
from py.__.execnet.channel import ChannelFactory, Channel from py.__.execnet.channel import ChannelFactory, Channel
from py.__.execnet.message import Message from py.__.execnet.message import Message
ThreadOut = py._thread.ThreadOut ThreadOut = py._thread.ThreadOut
WorkerPool = py._thread.WorkerPool
NamedThreadPool = py._thread.NamedThreadPool
import os import os
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa') debug = open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
sysex = (KeyboardInterrupt, SystemExit) sysex = (KeyboardInterrupt, SystemExit)
class StopExecLoop(Exception):
pass
class Gateway(object): class Gateway(object):
_ThreadOut = ThreadOut _ThreadOut = ThreadOut
remoteaddress = "" remoteaddress = ""
def __init__(self, io, execthreads=None, _startcount=2): _requestqueue = None
def __init__(self, io, _startcount=2):
""" initialize core gateway, using the given """ initialize core gateway, using the given
inputoutput object and 'execthreads' execution inputoutput object.
threads.
""" """
global registered_cleanup global registered_cleanup, _activegateways
self._execpool = WorkerPool(maxthreads=execthreads)
self._io = io self._io = io
self._outgoing = Queue.Queue()
self._channelfactory = ChannelFactory(self, _startcount) self._channelfactory = ChannelFactory(self, _startcount)
if not registered_cleanup: if not registered_cleanup:
atexit.register(cleanup_atexit) atexit.register(cleanup_atexit)
registered_cleanup = True registered_cleanup = True
_active_sendqueues[self._outgoing] = True _activegateways[self] = True
self._pool = NamedThreadPool(receiver = self._thread_receiver,
sender = self._thread_sender) def _initreceive(self, requestqueue=False):
if requestqueue:
self._requestqueue = Queue.Queue()
self._receiverthread = threading.Thread(name="receiver",
target=self._thread_receiver)
self._receiverthread.setDaemon(0)
self._receiverthread.start()
def __repr__(self): def __repr__(self):
""" return string representing gateway type and status. """ """ return string representing gateway type and status. """
@ -58,10 +63,9 @@ class Gateway(object):
else: else:
addr = '' addr = ''
try: try:
r = (len(self._pool.getstarted('receiver')) r = (self._receiverthread.isAlive() and "receiving" or
and "receiving" or "not receiving") "not receiving")
s = (len(self._pool.getstarted('sender')) s = "sending" # XXX
and "sending" or "not sending")
i = len(self._channelfactory.channels()) i = len(self._channelfactory.channels())
except AttributeError: except AttributeError:
r = s = "uninitialized" r = s = "uninitialized"
@ -69,9 +73,6 @@ class Gateway(object):
return "<%s%s %s/%s (%s active channels)>" %( return "<%s%s %s/%s (%s active channels)>" %(
self.__class__.__name__, addr, r, s, i) self.__class__.__name__, addr, r, s, i)
## def _local_trystopexec(self):
## self._execpool.shutdown()
def _trace(self, *args): def _trace(self, *args):
if debug: if debug:
try: try:
@ -111,35 +112,25 @@ class Gateway(object):
self._traceex(exc_info()) self._traceex(exc_info())
break break
finally: finally:
self._send(None) self._stopexec()
self._stopsend()
self._channelfactory._finished_receiving() self._channelfactory._finished_receiving()
self._trace('leaving %r' % threading.currentThread()) self._trace('leaving %r' % threading.currentThread())
def _send(self, msg): def _send(self, msg):
self._outgoing.put(msg) from sys import exc_info
if msg is None:
def _thread_sender(self): self._io.close_write()
""" thread to send Messages over the wire. """ else:
try: try:
from sys import exc_info msg.writeto(self._io)
while 1: except:
msg = self._outgoing.get() excinfo = exc_info()
try: self._traceex(excinfo)
if msg is None: msg.post_sent(self, excinfo)
self._io.close_write() else:
break msg.post_sent(self)
msg.writeto(self._io) self._trace('sent -> %r' % msg)
except:
excinfo = exc_info()
self._traceex(excinfo)
if msg is not None:
msg.post_sent(self, excinfo)
break
else:
self._trace('sent -> %r' % msg)
msg.post_sent(self)
finally:
self._trace('leaving %r' % threading.currentThread())
def _local_redirect_thread_output(self, outid, errid): def _local_redirect_thread_output(self, outid, errid):
l = [] l = []
@ -155,9 +146,58 @@ class Gateway(object):
channel.close() channel.close()
return close return close
def _thread_executor(self, channel, (source, outid, errid)): def _local_schedulexec(self, channel, sourcetask):
""" worker thread to execute source objects from the execution queue. """ if self._requestqueue is not None:
self._requestqueue.put((channel, sourcetask))
else:
# we will not execute, let's send back an error
# to inform the other side
channel.close("execution disallowed")
def _servemain(self, joining=True):
from sys import exc_info from sys import exc_info
self._initreceive(requestqueue=True)
try:
while 1:
item = self._requestqueue.get()
if item is None:
self._stopsend()
break
try:
self._executetask(item)
except StopExecLoop:
break
finally:
self._trace("_servemain finished")
if self.joining:
self.join()
def remote_init_threads(self, num=None):
""" start up to 'num' threads for subsequent
remote_exec() invocations to allow concurrent
execution.
"""
if hasattr(self, '_remotechannelthread'):
raise IOError("remote threads already running")
from py.__.thread import pool
source = py.code.Source(pool, """
execpool = WorkerPool(maxthreads=%r)
gw = channel.gateway
while 1:
task = gw._requestqueue.get()
if task is None:
gw._stopsend()
execpool.shutdown()
execpool.join()
raise StopExecLoop
execpool.dispatch(gw._executetask, task)
""" % num)
self._remotechannelthread = self.remote_exec(source)
def _executetask(self, item):
""" execute channel/source items. """
from sys import exc_info
channel, (source, outid, errid) = item
try: try:
loc = { 'channel' : channel } loc = { 'channel' : channel }
self._trace("execution starts:", repr(source)[:50]) self._trace("execution starts:", repr(source)[:50])
@ -171,6 +211,9 @@ class Gateway(object):
self._trace("execution finished:", repr(source)[:50]) self._trace("execution finished:", repr(source)[:50])
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
pass pass
except StopExecLoop:
channel.close()
raise
except: except:
excinfo = exc_info() excinfo = exc_info()
l = traceback.format_exception(*excinfo) l = traceback.format_exception(*excinfo)
@ -180,10 +223,6 @@ class Gateway(object):
else: else:
channel.close() channel.close()
def _local_schedulexec(self, channel, sourcetask):
self._trace("dispatching exec")
self._execpool.dispatch(self._thread_executor, channel, sourcetask)
def _newredirectchannelid(self, callback): def _newredirectchannelid(self, callback):
if callback is None: if callback is None:
return return
@ -257,27 +296,25 @@ class Gateway(object):
return Handle() return Handle()
def exit(self): def exit(self):
""" Try to stop all IO activity. """ """ Try to stop all exec and IO activity. """
try: self._stopexec()
del _active_sendqueues[self._outgoing] self._stopsend()
except KeyError:
pass def _stopsend(self):
else: self._send(None)
self._send(None)
def _stopexec(self):
if self._requestqueue is not None:
self._requestqueue.put(None)
def join(self, joinexec=True): def join(self, joinexec=True):
""" Wait for all IO (and by default all execution activity) """ Wait for all IO (and by default all execution activity)
to stop. to stop. the joinexec parameter is obsolete.
""" """
current = threading.currentThread() current = threading.currentThread()
for x in self._pool.getstarted(): if self._receiverthread.isAlive():
if x != current: self._trace("joining receiver thread")
self._trace("joining %s" % x) self._receiverthread.join()
x.join()
self._trace("joining sender/reciver threads finished, current %r" % current)
if joinexec:
self._execpool.join()
self._trace("joining execution threads finished, current %r" % current)
def getid(gw, cache={}): def getid(gw, cache={}):
name = gw.__class__.__name__ name = gw.__class__.__name__
@ -288,14 +325,12 @@ def getid(gw, cache={}):
return x return x
registered_cleanup = False registered_cleanup = False
_active_sendqueues = weakref.WeakKeyDictionary() _activegateways = weakref.WeakKeyDictionary()
def cleanup_atexit(): def cleanup_atexit():
if debug: if debug:
print >>debug, "="*20 + "cleaning up" + "=" * 20 print >>debug, "="*20 + "cleaning up" + "=" * 20
debug.flush() debug.flush()
while True: while _activegateways:
try: gw, ignored = _activegateways.popitem()
queue, ignored = _active_sendqueues.popitem() gw.exit()
except KeyError: #gw.join() should work as well
break
queue.put(None)

View File

@ -43,11 +43,17 @@ import sys
def close_read(self): def close_read(self):
if self.readable: if self.readable:
self.sock.shutdown(0) try:
self.sock.shutdown(0)
except socket.error:
pass
self.readable = None self.readable = None
def close_write(self): def close_write(self):
if self.writeable: if self.writeable:
self.sock.shutdown(1) try:
self.sock.shutdown(1)
except socket.error:
pass
self.writeable = None self.writeable = None
class Popen2IO: class Popen2IO:

View File

@ -11,7 +11,6 @@ import py
startup_modules = [ startup_modules = [
'py.__.thread.io', 'py.__.thread.io',
'py.__.thread.pool',
'py.__.execnet.inputoutput', 'py.__.execnet.inputoutput',
'py.__.execnet.gateway', 'py.__.execnet.gateway',
'py.__.execnet.message', 'py.__.execnet.message',
@ -29,6 +28,8 @@ class InstallableGateway(gateway.Gateway):
def __init__(self, io): def __init__(self, io):
self._remote_bootstrap_gateway(io) self._remote_bootstrap_gateway(io)
super(InstallableGateway, self).__init__(io=io, _startcount=1) super(InstallableGateway, self).__init__(io=io, _startcount=1)
# XXX we dissallow execution form the other side
self._initreceive(requestqueue=False)
def _remote_bootstrap_gateway(self, io, extra=''): def _remote_bootstrap_gateway(self, io, extra=''):
""" return Gateway with a asynchronously remotely """ return Gateway with a asynchronously remotely
@ -41,7 +42,7 @@ class InstallableGateway(gateway.Gateway):
bootstrap = [extra] bootstrap = [extra]
bootstrap += [getsource(x) for x in startup_modules] bootstrap += [getsource(x) for x in startup_modules]
bootstrap += [io.server_stmt, bootstrap += [io.server_stmt,
"Gateway(io=io, _startcount=2).join(joinexec=False)", "Gateway(io=io, _startcount=2)._servemain()",
] ]
source = "\n".join(bootstrap) source = "\n".join(bootstrap)
self._trace("sending gateway bootstrap code") self._trace("sending gateway bootstrap code")

View File

@ -83,8 +83,7 @@ class PopenGatewayTestSetup:
class BasicRemoteExecution: class BasicRemoteExecution:
def test_correct_setup(self): def test_correct_setup(self):
for x in 'sender', 'receiver': assert self.gw._receiverthread.isAlive()
assert self.gw._pool.getstarted(x)
def test_repr_doesnt_crash(self): def test_repr_doesnt_crash(self):
assert isinstance(repr(self), str) assert isinstance(repr(self), str)
@ -373,6 +372,18 @@ class BasicRemoteExecution:
res = channel.receive() res = channel.receive()
assert res == 42 assert res == 42
def test_non_reverse_execution(self):
gw = self.gw
c1 = gw.remote_exec("""
c = channel.gateway.remote_exec("pass")
try:
c.waitclose()
except c.RemoteError, e:
channel.send(str(e))
""")
text = c1.receive()
assert text.find("execution disallowed") != -1
#class TestBlockingIssues: #class TestBlockingIssues:
# def test_join_blocked_execution_gateway(self): # def test_join_blocked_execution_gateway(self):
# gateway = py.execnet.PopenGateway() # gateway = py.execnet.PopenGateway()
@ -486,3 +497,23 @@ class TestSshGateway(BasicRemoteExecution):
# now it did # now it did
py.test.raises(IOError, gw.remote_exec, "...") py.test.raises(IOError, gw.remote_exec, "...")
def test_threads():
gw = py.execnet.PopenGateway()
gw.remote_init_threads(3)
c1 = gw.remote_exec("channel.send(channel.receive())")
c2 = gw.remote_exec("channel.send(channel.receive())")
c2.send(1)
res = c2.receive()
assert res == 1
c1.send(42)
res = c1.receive()
assert res == 42
gw.exit()
def test_threads_twice():
gw = py.execnet.PopenGateway()
gw.remote_init_threads(3)
py.test.raises(IOError, gw.remote_init_threads, 3)
gw.exit()