* simplify stdout/stderr handling and modules and for now remove support

for directly stdout/stderr directly on remote_exec

--HG--
branch : trunk
This commit is contained in:
holger krekel 2009-09-02 19:39:24 +02:00
parent 73fc2f01f2
commit 6c3e961bc5
3 changed files with 22 additions and 56 deletions

View File

@ -9,7 +9,6 @@ from py.__.execnet.gateway_base import ExecnetAPI
# XXX we'd like to have a leaner and meaner bootstrap mechanism # XXX we'd like to have a leaner and meaner bootstrap mechanism
startup_modules = [ startup_modules = [
'py.__.thread.io',
'py.__.execnet.gateway_base', 'py.__.execnet.gateway_base',
] ]
@ -48,7 +47,7 @@ class InitiatingGateway(BaseGateway):
self._remote_bootstrap_gateway(io) self._remote_bootstrap_gateway(io)
super(InitiatingGateway, self).__init__(io=io, _startcount=1) super(InitiatingGateway, self).__init__(io=io, _startcount=1)
# XXX we dissallow execution form the other side # XXX we dissallow execution form the other side
self._initreceive(requestqueue=False) self._initreceive()
self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry) self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry)
self.hook.pyexecnet_gateway_init(gateway=self) self.hook.pyexecnet_gateway_init(gateway=self)
self._cleanup.register(self) self._cleanup.register(self)
@ -103,20 +102,15 @@ class InitiatingGateway(BaseGateway):
self._cache_rinfo = RInfo(**ch.receive()) self._cache_rinfo = RInfo(**ch.receive())
return self._cache_rinfo return self._cache_rinfo
def remote_exec(self, source, stdout=None, stderr=None): def remote_exec(self, source):
""" return channel object and connect it to a remote """ return channel object and connect it to a remote
execution thread where the given 'source' executes execution thread where the given 'source' executes
and has the sister 'channel' object in its global and has the sister 'channel' object in its global
namespace. The callback functions 'stdout' and namespace.
'stderr' get called on receival of remote
stdout/stderr output strings.
""" """
source = str(py.code.Source(source)) source = str(py.code.Source(source))
channel = self.newchannel() channel = self.newchannel()
outid = self._newredirectchannelid(stdout) self._send(Message.CHANNEL_OPEN(channel.id, source))
errid = self._newredirectchannelid(stderr)
self._send(Message.CHANNEL_OPEN(
channel.id, (source, outid, errid)))
return channel return channel
def remote_init_threads(self, num=None): def remote_init_threads(self, num=None):
@ -131,7 +125,7 @@ class InitiatingGateway(BaseGateway):
execpool = WorkerPool(maxthreads=%r) execpool = WorkerPool(maxthreads=%r)
gw = channel.gateway gw = channel.gateway
while 1: while 1:
task = gw._requestqueue.get() task = gw._execqueue.get()
if task is None: if task is None:
gw._stopsend() gw._stopsend()
execpool.shutdown() execpool.shutdown()
@ -141,21 +135,13 @@ class InitiatingGateway(BaseGateway):
""" % num) """ % num)
self._remotechannelthread = self.remote_exec(source) self._remotechannelthread = self.remote_exec(source)
def _newredirectchannelid(self, callback):
if callback is None:
return
if hasattr(callback, 'write'):
callback = callback.write
assert callable(callback)
chan = self.newchannel()
chan.setcallback(callback)
return chan.id
def _remote_redirect(self, stdout=None, stderr=None): def _remote_redirect(self, stdout=None, stderr=None):
""" return a handle representing a redirection of a remote """ return a handle representing a redirection of a remote
end's stdout to a local file object. with handle.close() end's stdout to a local file object. with handle.close()
the redirection will be reverted. the redirection will be reverted.
""" """
# XXX implement a remote_exec_in_globals(...)
# to send ThreadOut implementation over
clist = [] clist = []
for name, out in ('stdout', stdout), ('stderr', stderr): for name, out in ('stdout', stdout), ('stderr', stderr):
if out: if out:
@ -164,7 +150,7 @@ class InitiatingGateway(BaseGateway):
channel = self.remote_exec(""" channel = self.remote_exec("""
import sys import sys
outchannel = channel.receive() outchannel = channel.receive()
outchannel.gateway._ThreadOut(sys, %r).setdefaultwriter(outchannel.send) ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
""" % name) """ % name)
channel.send(outchannel) channel.send(outchannel)
clist.append(channel) clist.append(channel)

View File

@ -18,11 +18,6 @@ try:
except ImportError: except ImportError:
import Queue as queue import Queue as queue
# XXX the following lines should not be here
if 'ThreadOut' not in globals():
import py
ThreadOut = py._thread.ThreadOut
if sys.version_info > (3, 0): if sys.version_info > (3, 0):
exec("""def do_exec(co, loc): exec("""def do_exec(co, loc):
exec(co, loc)""") exec(co, loc)""")
@ -594,14 +589,14 @@ class ExecnetAPI:
def pyexecnet_gwmanage_rsyncfinish(self, source, gateways): def pyexecnet_gwmanage_rsyncfinish(self, source, gateways):
""" called after rsyncing a directory to remote gateways takes place. """ """ called after rsyncing a directory to remote gateways takes place. """
class BaseGateway(object): class BaseGateway(object):
hook = ExecnetAPI() hook = ExecnetAPI()
exc_info = sys.exc_info exc_info = sys.exc_info
class _StopExecLoop(Exception): pass class _StopExecLoop(Exception):
_ThreadOut = ThreadOut pass
_requestqueue = None
def __init__(self, io, _startcount=2): def __init__(self, io, _startcount=2):
""" initialize core gateway, using the given inputoutput object. """ initialize core gateway, using the given inputoutput object.
@ -610,9 +605,7 @@ class BaseGateway(object):
self._channelfactory = ChannelFactory(self, _startcount) self._channelfactory = ChannelFactory(self, _startcount)
self._receivelock = threading.RLock() self._receivelock = threading.RLock()
def _initreceive(self, requestqueue=False): def _initreceive(self):
if requestqueue:
self._requestqueue = queue.Queue()
self._receiverthread = threading.Thread(name="receiver", self._receiverthread = threading.Thread(name="receiver",
target=self._thread_receiver) target=self._thread_receiver)
self._receiverthread.setDaemon(1) self._receiverthread.setDaemon(1)
@ -678,36 +671,23 @@ class BaseGateway(object):
self._send(None) self._send(None)
def _stopexec(self): def _stopexec(self):
if self._requestqueue is not None: if hasattr(self, '_execqueue'):
self._requestqueue.put(None) self._execqueue.put(None)
def _local_redirect_thread_output(self, outid, errid):
l = []
for name, id in ('stdout', outid), ('stderr', errid):
if id:
channel = self._channelfactory.new(outid)
out = self._ThreadOut(sys, name)
out.setwritefunc(channel.send)
l.append((out, channel))
def close():
for out, channel in l:
out.delwritefunc()
channel.close()
return close
def _local_schedulexec(self, channel, sourcetask): def _local_schedulexec(self, channel, sourcetask):
if self._requestqueue is not None: if hasattr(self, '_execqueue'):
self._requestqueue.put((channel, sourcetask)) self._execqueue.put((channel, sourcetask))
else: else:
# we will not execute, let's send back an error # we will not execute, let's send back an error
# to inform the other side # to inform the other side
channel.close("execution disallowed") channel.close("execution disallowed")
def _servemain(self, joining=True): def _servemain(self, joining=True):
self._initreceive(requestqueue=True) self._execqueue = queue.Queue()
self._initreceive()
try: try:
while 1: while 1:
item = self._requestqueue.get() item = self._execqueue.get()
if item is None: if item is None:
self._stopsend() self._stopsend()
break break
@ -722,17 +702,15 @@ class BaseGateway(object):
def _executetask(self, item): def _executetask(self, item):
""" execute channel/source items. """ """ execute channel/source items. """
channel, (source, outid, errid) = item channel, source = item
try: try:
loc = { 'channel' : channel, '__name__': '__channelexec__'} loc = { 'channel' : channel, '__name__': '__channelexec__'}
#open("task.py", 'w').write(source) #open("task.py", 'w').write(source)
self._trace("execution starts: %s" % repr(source)[:50]) self._trace("execution starts: %s" % repr(source)[:50])
close = self._local_redirect_thread_output(outid, errid)
try: try:
co = compile(source+'\n', '', 'exec') co = compile(source+'\n', '', 'exec')
do_exec(co, loc) do_exec(co, loc)
finally: finally:
close()
self._trace("execution finished") self._trace("execution finished")
except sysex: except sysex:
pass pass

View File

@ -429,6 +429,7 @@ class BasicRemoteExecution:
assert err assert err
assert str(err).find("ValueError") != -1 assert str(err).find("ValueError") != -1
@py.test.mark.xfail
def test_remote_redirect_stdout(self): def test_remote_redirect_stdout(self):
out = py.io.TextIO() out = py.io.TextIO()
handle = self.gw._remote_redirect(stdout=out) handle = self.gw._remote_redirect(stdout=out)
@ -438,6 +439,7 @@ class BasicRemoteExecution:
s = out.getvalue() s = out.getvalue()
assert s.strip() == "42" assert s.strip() == "42"
@py.test.mark.xfail
def test_remote_exec_redirect_multi(self): def test_remote_exec_redirect_multi(self):
num = 3 num = 3
l = [[] for x in range(num)] l = [[] for x in range(num)]