[svn r37264] create the new development trunk

--HG--
branch : trunk
This commit is contained in:
hpk
2007-01-24 15:24:01 +01:00
commit 5992a8ef21
435 changed files with 58640 additions and 0 deletions

122
py/execnet/NOTES Normal file
View File

@@ -0,0 +1,122 @@
=============================================================================
Channel implementation notes
=============================================================================
The public API of channels make them appear either opened or closed.
When a channel is closed, we can't send any more items, and it will not
receive any more items than already queued.
Callbacks make the situation slightly more subtle. Callbacks are
attached to the ChannelFactory object, so that Channel objects can be
garbage-collected and still leave behind an active callback that can
continue to receive items.
The CHANNEL_CLOSE message is sent when a channel id is about to be removed
from the ChannelFactory, which means when the Channel object has been
garbage-collected *and* there is no callback any more.
If a Channel object is garbage-collected but the ChannelFactory has a
callback for it, a CHANNEL_LAST_MESSAGE message is sent. It is only useful
if both sides' Channel objects have an associated callback. In this
situation, CHANNEL_LAST_MESSAGE allows its receiver to un-register its own
callback; if/when in addition the receiver side also looses the last
reference to its Channel object, the Channel is closed. So in this particular
situation both sides must forget about the Channel object for it to be
automatically closed.
gateway <---> channelfactory ---> {id: weakref(channel)}
---> {id: callback}
State and invariants of Channel objects
---------------------------------------
_channels and _callbacks are dictionaries on the ChannelFactory.
Other attributes are on the Channel objects.
All states are valid at any time (even with multithreading) unless
marked with {E}, which means that they may be temporary invalid.
They are eventually restored.
States ("sendonly" means opened but won't receive any more items):
opened sendonly closed deleted
================= ============== ================== ===============
not _closed not _closed _closed <no ref left>
not _receiveclosed _receiveclosed {E} _receiveclosed
In the presence of callbacks, "deleted" does not imply "closed" nor "sendonly".
It only means that no more items can be sent. The (logical) channel can
continue to receive data via the call-back even if the channel object no
longer exists.
The two kinds of channels, with or without callback:
items read by receive() has a callback
============================= =======================================
_items is a Queue _items is None
id not in _callbacks
state==opened: id in _callbacks
{E} state==sendonly: there is {E} state!=opened: id not in _callbacks
an ENDMARKER in _items
{E} state==closed: there is
an ENDMARKER in _items
Callback calls should be considered asynchronuous. The channel can be in any
state and change its state while the callback runs.
The ChannelFactory's WeakValueDictionary _channels maps some ids to their
channel object, depending on their state:
opened sendonly closed deleted
================= ============== ================ ===============
id in _channels {E} not in {E} not in not in
All received RemoteErrors are handled exactly once: they are normally
re-raised once in waitclose() or receive(). If it is not possible, they are
at the moment dumped to stderr. (XXX should use logging/tracing)
Only channels in {E} "closed" state can hold RemoteErrors.
Methods:
* close() returns with the channel in "closed" state
* send() either send the data or raise if "closed"
* receive() wait for the next item. If no item left and the state
changes to non-"opened", raise
* waitclose() wait for a non-"opened" state
Assuming the channel is connected and the connection is alive, the local state
eventually influences the state of the corresponding remote channel object:
local | opened sendonly closed deleted
remote |
=======================================================
|
opened | ok n/a (1) (2)
|
sendonly | n/a n/a n/a ok
|
closed | (1) n/a ok ok
|
deleted | (2) ok ok ok
(1) The side with the closed channel object must send a CHANNEL_CLOSE message,
which will eventually put the other side's channel in "closed" state if
it is still "opened".
(2) If the deleted channel has no callback, this is equivalent to (1).
Otherwide, the side with the deleted channel must send a
CHANNEL_LAST_MESSAGE, which will eventually put the other side's channel in
"sendonly" state if it is still "opened".
n/a These configuration should never occur.

1
py/execnet/__init__.py Normal file
View File

@@ -0,0 +1 @@
#

321
py/execnet/channel.py Normal file
View File

@@ -0,0 +1,321 @@
import threading, weakref, sys
import Queue
if 'Message' not in globals():
from py.__.execnet.message import Message
class RemoteError(EOFError):
""" Contains an Exceptions from the other side. """
def __init__(self, formatted):
self.formatted = formatted
EOFError.__init__(self)
def __str__(self):
return self.formatted
def __repr__(self):
return "%s: %s" %(self.__class__.__name__, self.formatted)
def warn(self):
# XXX do this better
print >> sys.stderr, "Warning: unhandled %r" % (self,)
NO_ENDMARKER_WANTED = object()
class Channel(object):
"""Communication channel between two possibly remote threads of code. """
RemoteError = RemoteError
def __init__(self, gateway, id):
assert isinstance(id, int)
self.gateway = gateway
self.id = id
self._items = Queue.Queue()
self._closed = False
self._receiveclosed = threading.Event()
self._remoteerrors = []
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
queue = self._items
lock = self.gateway.channelfactory._receivelock
lock.acquire()
try:
_callbacks = self.gateway.channelfactory._callbacks
dictvalue = (callback, endmarker)
if _callbacks.setdefault(self.id, dictvalue) != dictvalue:
raise IOError("%r has callback already registered" %(self,))
self._items = None
while 1:
try:
olditem = queue.get(block=False)
except Queue.Empty:
break
else:
if olditem is ENDMARKER:
queue.put(olditem)
break
else:
callback(olditem)
if self._closed or self._receiveclosed.isSet():
# no need to keep a callback
self.gateway.channelfactory._close_callback(self.id)
finally:
lock.release()
def __repr__(self):
flag = self.isclosed() and "closed" or "open"
return "<Channel id=%d %s>" % (self.id, flag)
def __del__(self):
if self.gateway is None: # can be None in tests
return
self.gateway._trace("Channel(%d).__del__" % self.id)
# no multithreading issues here, because we have the last ref to 'self'
if self._closed:
# state transition "closed" --> "deleted"
for error in self._remoteerrors:
error.warn()
elif self._receiveclosed.isSet():
# state transition "sendonly" --> "deleted"
# the remote channel is already in "deleted" state, nothing to do
pass
else:
# state transition "opened" --> "deleted"
if self._items is None: # has_callback
Msg = Message.CHANNEL_LAST_MESSAGE
else:
Msg = Message.CHANNEL_CLOSE
self.gateway._outgoing.put(Msg(self.id))
def _getremoteerror(self):
try:
return self._remoteerrors.pop(0)
except IndexError:
return None
#
# public API for channel objects
#
def isclosed(self):
""" return True if the channel is closed. A closed
channel may still hold items.
"""
return self._closed
def makefile(self, mode='w', proxyclose=False):
""" return a file-like object. Only supported mode right
now is 'w' for binary writes. If you want to have
a subsequent file.close() mean to close the channel
as well, then pass proxyclose=True.
"""
assert mode == 'w', "mode %r not availabe" %(mode,)
return ChannelFile(channel=self, proxyclose=proxyclose)
def close(self, error=None):
""" close down this channel on both sides. """
if not self._closed:
# state transition "opened/sendonly" --> "closed"
# threads warning: the channel might be closed under our feet,
# but it's never damaging to send too many CHANNEL_CLOSE messages
put = self.gateway._outgoing.put
if error is not None:
put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
else:
put(Message.CHANNEL_CLOSE(self.id))
if isinstance(error, RemoteError):
self._remoteerrors.append(error)
self._closed = True # --> "closed"
self._receiveclosed.set()
queue = self._items
if queue is not None:
queue.put(ENDMARKER)
self.gateway.channelfactory._no_longer_opened(self.id)
def waitclose(self, timeout=None):
""" wait until this channel is closed (or the remote side
otherwise signalled that no more data was being sent).
The channel may still hold receiveable items, but not receive
more. waitclose() reraises exceptions from executing code on
the other side as channel.RemoteErrors containing a a textual
representation of the remote traceback.
"""
self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state
if not self._receiveclosed.isSet():
raise IOError, "Timeout"
error = self._getremoteerror()
if error:
raise error
def send(self, item):
"""sends the given item to the other side of the channel,
possibly blocking if the sender queue is full.
Note that an item needs to be marshallable.
"""
if self.isclosed():
raise IOError, "cannot send to %r" %(self,)
if isinstance(item, Channel):
data = Message.CHANNEL_NEW(self.id, item.id)
else:
data = Message.CHANNEL_DATA(self.id, item)
self.gateway._outgoing.put(data)
def receive(self):
"""receives an item that was sent from the other side,
possibly blocking if there is none.
Note that exceptions from the other side will be
reraised as channel.RemoteError exceptions containing
a textual representation of the remote traceback.
"""
queue = self._items
if queue is None:
raise IOError("calling receive() on channel with receiver callback")
x = queue.get()
if x is ENDMARKER:
queue.put(x) # for other receivers
raise self._getremoteerror() or EOFError()
else:
return x
def __iter__(self):
return self
def next(self):
try:
return self.receive()
except EOFError:
raise StopIteration
#
# helpers
#
ENDMARKER = object()
class ChannelFactory(object):
RemoteError = RemoteError
def __init__(self, gateway, startcount=1):
self._channels = weakref.WeakValueDictionary()
self._callbacks = {}
self._writelock = threading.Lock()
self._receivelock = threading.RLock()
self.gateway = gateway
self.count = startcount
self.finished = False
def new(self, id=None):
""" create a new Channel with 'id' (or create new id if None). """
self._writelock.acquire()
try:
if self.finished:
raise IOError("connexion already closed: %s" % (self.gateway,))
if id is None:
id = self.count
self.count += 2
channel = Channel(self.gateway, id)
self._channels[id] = channel
return channel
finally:
self._writelock.release()
def channels(self):
return self._channels.values()
#
# internal methods, called from the receiver thread
#
def _no_longer_opened(self, id):
try:
del self._channels[id]
except KeyError:
pass
self._close_callback(id)
def _close_callback(self, id):
try:
callback, endmarker = self._callbacks.pop(id)
except KeyError:
pass
else:
if endmarker is not NO_ENDMARKER_WANTED:
callback(endmarker)
def _local_close(self, id, remoteerror=None):
channel = self._channels.get(id)
if channel is None:
# channel already in "deleted" state
if remoteerror:
remoteerror.warn()
else:
# state transition to "closed" state
if remoteerror:
channel._remoteerrors.append(remoteerror)
channel._closed = True # --> "closed"
channel._receiveclosed.set()
queue = channel._items
if queue is not None:
queue.put(ENDMARKER)
self._no_longer_opened(id)
def _local_last_message(self, id):
channel = self._channels.get(id)
if channel is None:
# channel already in "deleted" state
pass
else:
# state transition: if "opened", change to "sendonly"
channel._receiveclosed.set()
queue = channel._items
if queue is not None:
queue.put(ENDMARKER)
self._no_longer_opened(id)
def _local_receive(self, id, data):
# executes in receiver thread
self._receivelock.acquire()
try:
try:
callback, endmarker = self._callbacks[id]
except KeyError:
channel = self._channels.get(id)
queue = channel and channel._items
if queue is None:
pass # drop data
else:
queue.put(data)
else:
callback(data) # even if channel may be already closed
finally:
self._receivelock.release()
def _finished_receiving(self):
self._writelock.acquire()
try:
self.finished = True
finally:
self._writelock.release()
for id in self._channels.keys():
self._local_last_message(id)
for id in self._callbacks.keys():
self._close_callback(id)
class ChannelFile:
def __init__(self, channel, proxyclose=True):
self.channel = channel
self._proxyclose = proxyclose
def write(self, out):
self.channel.send(out)
def flush(self):
pass
def close(self):
if self._proxyclose:
self.channel.close()
def __repr__(self):
state = self.channel.isclosed() and 'closed' or 'open'
return '<ChannelFile %d %s>' %(self.channel.id, state)

309
py/execnet/gateway.py Normal file
View File

@@ -0,0 +1,309 @@
import os
import threading
import Queue
import traceback
import atexit
import weakref
import __future__
# note that the whole code of this module (as well as some
# other modules) execute not only on the local side but
# also on any gateway's remote side. On such remote sides
# we cannot assume the py library to be there and
# InstallableGateway.remote_bootstrap_gateway() (located
# in register.py) will take care to send source fragments
# to the other side. Yes, it is fragile but we have a
# few tests that try to catch when we mess up.
# XXX the following lines should not be here
if 'ThreadOut' not in globals():
import py
from py.code import Source
from py.__.execnet.channel import ChannelFactory, Channel
from py.__.execnet.message import Message
ThreadOut = py._thread.ThreadOut
WorkerPool = py._thread.WorkerPool
NamedThreadPool = py._thread.NamedThreadPool
import os
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
sysex = (KeyboardInterrupt, SystemExit)
class Gateway(object):
num_worker_threads = 2
ThreadOut = ThreadOut
def __init__(self, io, startcount=2, maxthreads=None):
global registered_cleanup
self._execpool = WorkerPool()
## self.running = True
self.io = io
self._outgoing = Queue.Queue()
self.channelfactory = ChannelFactory(self, startcount)
## self._exitlock = threading.Lock()
if not registered_cleanup:
atexit.register(cleanup_atexit)
registered_cleanup = True
_active_sendqueues[self._outgoing] = True
self.pool = NamedThreadPool(receiver = self.thread_receiver,
sender = self.thread_sender)
def __repr__(self):
addr = self._getremoteaddress()
if addr:
addr = '[%s]' % (addr,)
else:
addr = ''
r = (len(self.pool.getstarted('receiver'))
and "receiving" or "not receiving")
s = (len(self.pool.getstarted('sender'))
and "sending" or "not sending")
i = len(self.channelfactory.channels())
return "<%s%s %s/%s (%d active channels)>" %(
self.__class__.__name__, addr, r, s, i)
def _getremoteaddress(self):
return None
## def _local_trystopexec(self):
## self._execpool.shutdown()
def _trace(self, *args):
if debug:
try:
l = "\n".join(args).split(os.linesep)
id = getid(self)
for x in l:
print >>debug, x
debug.flush()
except sysex:
raise
except:
traceback.print_exc()
def _traceex(self, excinfo):
try:
l = traceback.format_exception(*excinfo)
errortext = "".join(l)
except:
errortext = '%s: %s' % (excinfo[0].__name__,
excinfo[1])
self._trace(errortext)
def thread_receiver(self):
""" thread to read and handle Messages half-sync-half-async. """
try:
from sys import exc_info
while 1:
try:
msg = Message.readfrom(self.io)
self._trace("received <- %r" % msg)
msg.received(self)
except sysex:
raise
except EOFError:
break
except:
self._traceex(exc_info())
break
finally:
self._outgoing.put(None)
self.channelfactory._finished_receiving()
self._trace('leaving %r' % threading.currentThread())
def thread_sender(self):
""" thread to send Messages over the wire. """
try:
from sys import exc_info
while 1:
msg = self._outgoing.get()
try:
if msg is None:
self.io.close_write()
break
msg.writeto(self.io)
except:
excinfo = exc_info()
self._traceex(excinfo)
if msg is not None:
msg.post_sent(self, excinfo)
raise
else:
self._trace('sent -> %r' % msg)
msg.post_sent(self)
finally:
self._trace('leaving %r' % threading.currentThread())
def _local_redirect_thread_output(self, outid, errid):
l = []
for name, id in ('stdout', outid), ('stderr', errid):
if id:
channel = self.channelfactory.new(outid)
out = ThreadOut(sys, name)
out.setwritefunc(channel.send)
l.append((out, channel))
def close():
for out, channel in l:
out.delwritefunc()
channel.close()
return close
def thread_executor(self, channel, (source, outid, errid)):
""" worker thread to execute source objects from the execution queue. """
from sys import exc_info
try:
loc = { 'channel' : channel }
self._trace("execution starts:", repr(source)[:50])
close = self._local_redirect_thread_output(outid, errid)
try:
co = compile(source+'\n', '', 'exec',
__future__.CO_GENERATOR_ALLOWED)
exec co in loc
finally:
close()
self._trace("execution finished:", repr(source)[:50])
except (KeyboardInterrupt, SystemExit):
raise
except:
excinfo = exc_info()
l = traceback.format_exception(*excinfo)
errortext = "".join(l)
channel.close(errortext)
self._trace(errortext)
else:
channel.close()
def _local_schedulexec(self, channel, sourcetask):
self._trace("dispatching exec")
self._execpool.dispatch(self.thread_executor, channel, sourcetask)
def _newredirectchannelid(self, callback):
if callback is None:
return
if hasattr(callback, 'write'):
callback = callback.write
assert callable(callback)
chan = self.newchannel()
chan.setcallback(callback)
return chan.id
# _____________________________________________________________________
#
# High Level Interface
# _____________________________________________________________________
#
def newchannel(self):
""" return new channel object. """
return self.channelfactory.new()
def remote_exec(self, source, stdout=None, stderr=None):
""" return channel object for communicating with the asynchronously
executing 'source' code which will have a corresponding 'channel'
object in its executing namespace.
"""
try:
source = str(Source(source))
except NameError:
try:
import py
source = str(py.code.Source(source))
except ImportError:
pass
channel = self.newchannel()
outid = self._newredirectchannelid(stdout)
errid = self._newredirectchannelid(stderr)
self._outgoing.put(Message.CHANNEL_OPEN(channel.id,
(source, outid, errid)))
return channel
def remote_redirect(self, stdout=None, stderr=None):
""" return a handle representing a redirection of a remote
end's stdout to a local file object. with handle.close()
the redirection will be reverted.
"""
clist = []
for name, out in ('stdout', stdout), ('stderr', stderr):
if out:
outchannel = self.newchannel()
outchannel.setcallback(getattr(out, 'write', out))
channel = self.remote_exec("""
import sys
outchannel = channel.receive()
outchannel.gateway.ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
""" % name)
channel.send(outchannel)
clist.append(channel)
for c in clist:
c.waitclose(1.0)
class Handle:
def close(_):
for name, out in ('stdout', stdout), ('stderr', stderr):
if out:
c = self.remote_exec("""
import sys
channel.gateway.ThreadOut(sys, %r).resetdefault()
""" % name)
c.waitclose(1.0)
return Handle()
## def exit(self):
## """ initiate full gateway teardown.
## Note that the teardown of sender/receiver threads happens
## asynchronously and timeouts on stopping worker execution
## threads are ignored. You can issue join() or join(joinexec=False)
## if you want to wait for a full teardown (possibly excluding
## execution threads).
## """
## # note that threads may still be scheduled to start
## # during our execution!
## self._exitlock.acquire()
## try:
## if self.running:
## self.running = False
## if not self.pool.getstarted('sender'):
## raise IOError("sender thread not alive anymore!")
## self._outgoing.put(None)
## self._trace("exit procedure triggered, pid %d " % (os.getpid(),))
## _gateways.remove(self)
## finally:
## self._exitlock.release()
def exit(self):
self._outgoing.put(None)
try:
del _active_sendqueues[self._outgoing]
except KeyError:
pass
def join(self, joinexec=True):
current = threading.currentThread()
for x in self.pool.getstarted():
if x != current:
self._trace("joining %s" % x)
x.join()
self._trace("joining sender/reciver threads finished, current %r" % current)
if joinexec:
self._execpool.join()
self._trace("joining execution threads finished, current %r" % current)
def getid(gw, cache={}):
name = gw.__class__.__name__
try:
return cache.setdefault(name, {})[id(gw)]
except KeyError:
cache[name][id(gw)] = x = "%s:%s.%d" %(os.getpid(), gw.__class__.__name__, len(cache[name]))
return x
registered_cleanup = False
_active_sendqueues = weakref.WeakKeyDictionary()
def cleanup_atexit():
if debug:
print >>debug, "="*20 + "cleaning up" + "=" * 20
debug.flush()
while True:
try:
queue, ignored = _active_sendqueues.popitem()
except KeyError:
break
queue.put(None)

98
py/execnet/inputoutput.py Normal file
View File

@@ -0,0 +1,98 @@
"""
InputOutput Classes used for connecting gateways
across process or computer barriers.
"""
import socket, os, sys
class SocketIO:
server_stmt = """
io = SocketIO(clientsock)
import sys
#try:
# sys.stdout = sys.stderr = open('/tmp/execnet-socket-debug.log', 'a', 0)
#except (IOError, OSError):
# sys.stdout = sys.stderr = open('/dev/null', 'w')
#print '='*60
"""
error = (socket.error, EOFError)
def __init__(self, sock):
self.sock = sock
try:
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10) # IPTOS_LOWDELAY
except socket.error, e:
print "WARNING: Cannot set socket option:", str(e)
self.readable = self.writeable = True
def read(self, numbytes):
"Read exactly 'bytes' bytes from the socket."
buf = ""
while len(buf) < numbytes:
t = self.sock.recv(numbytes - len(buf))
#print 'recv -->', len(t)
if not t:
raise EOFError
buf += t
return buf
def write(self, data):
"""write out all bytes to the socket. """
self.sock.sendall(data)
def close_read(self):
if self.readable:
self.sock.shutdown(0)
self.readable = None
def close_write(self):
if self.writeable:
self.sock.shutdown(1)
self.writeable = None
class Popen2IO:
server_stmt = """
import sys, StringIO
io = Popen2IO(sys.stdout, sys.stdin)
sys.stdout = sys.stderr = StringIO.StringIO()
#try:
# sys.stdout = sys.stderr = open('/tmp/execnet-popen-debug.log', 'a', 0)
#except (IOError, OSError):
# sys.stdout = sys.stderr = open('/dev/null', 'w')
#print '='*60
"""
error = (IOError, OSError, EOFError)
def __init__(self, infile, outfile):
if sys.platform == 'win32':
import msvcrt
msvcrt.setmode(infile.fileno(), os.O_BINARY)
msvcrt.setmode(outfile.fileno(), os.O_BINARY)
self.outfile, self.infile = infile, outfile
self.readable = self.writeable = True
def read(self, numbytes):
"""Read exactly 'bytes' bytes from the pipe. """
#import sys
#print >> sys.stderr, "reading..."
s = self.infile.read(numbytes)
#print >> sys.stderr, "read: %r" % s
if len(s) < numbytes:
raise EOFError
return s
def write(self, data):
"""write out all bytes to the pipe. """
#import sys
#print >> sys.stderr, "writing: %r" % data
self.outfile.write(data)
self.outfile.flush()
def close_read(self):
if self.readable:
self.infile.close()
self.readable = None
def close_write(self):
if self.writeable:
self.outfile.close()
self.writeable = None

102
py/execnet/message.py Normal file
View File

@@ -0,0 +1,102 @@
import struct
#import marshal
# ___________________________________________________________________________
#
# Messages
# ___________________________________________________________________________
# the header format
HDR_FORMAT = "!hhii"
HDR_SIZE = struct.calcsize(HDR_FORMAT)
class Message:
""" encapsulates Messages and their wire protocol. """
_types = {}
def __init__(self, channelid=0, data=''):
self.channelid = channelid
self.data = data
def writeto(self, io):
# XXX marshal.dumps doesn't work for exchanging data across Python
# version :-((( There is no sane solution, short of a custom
# pure Python marshaller
data = self.data
if isinstance(data, str):
dataformat = 1
else:
data = repr(self.data) # argh
dataformat = 2
header = struct.pack(HDR_FORMAT, self.msgtype, dataformat,
self.channelid, len(data))
io.write(header + data)
def readfrom(cls, io):
header = io.read(HDR_SIZE)
(msgtype, dataformat,
senderid, stringlen) = struct.unpack(HDR_FORMAT, header)
data = io.read(stringlen)
if dataformat == 1:
pass
elif dataformat == 2:
data = eval(data, {}) # reversed argh
else:
raise ValueError("bad data format")
msg = cls._types[msgtype](senderid, data)
return msg
readfrom = classmethod(readfrom)
def post_sent(self, gateway, excinfo=None):
pass
def __repr__(self):
r = repr(self.data)
if len(r) > 50:
return "<Message.%s channelid=%d len=%d>" %(self.__class__.__name__,
self.channelid, len(r))
else:
return "<Message.%s channelid=%d %r>" %(self.__class__.__name__,
self.channelid, self.data)
def _setupmessages():
class CHANNEL_OPEN(Message):
def received(self, gateway):
channel = gateway.channelfactory.new(self.channelid)
gateway._local_schedulexec(channel=channel, sourcetask=self.data)
class CHANNEL_NEW(Message):
def received(self, gateway):
""" receive a remotely created new (sub)channel. """
newid = self.data
newchannel = gateway.channelfactory.new(newid)
gateway.channelfactory._local_receive(self.channelid, newchannel)
class CHANNEL_DATA(Message):
def received(self, gateway):
gateway.channelfactory._local_receive(self.channelid, self.data)
class CHANNEL_CLOSE(Message):
def received(self, gateway):
gateway.channelfactory._local_close(self.channelid)
class CHANNEL_CLOSE_ERROR(Message):
def received(self, gateway):
remote_error = gateway.channelfactory.RemoteError(self.data)
gateway.channelfactory._local_close(self.channelid, remote_error)
class CHANNEL_LAST_MESSAGE(Message):
def received(self, gateway):
gateway.channelfactory._local_last_message(self.channelid)
classes = [x for x in locals().values() if hasattr(x, '__bases__')]
classes.sort(lambda x,y : cmp(x.__name__, y.__name__))
i = 0
for cls in classes:
Message._types[i] = cls
cls.msgtype = i
setattr(Message, cls.__name__, cls)
i+=1
_setupmessages()

233
py/execnet/register.py Normal file
View File

@@ -0,0 +1,233 @@
import os, inspect, socket
import sys
from py.magic import autopath ; mypath = autopath()
import py
# the list of modules that must be send to the other side
# for bootstrapping gateways
# XXX we want to have a cleaner bootstrap mechanism
# by making sure early that we have the py lib available
# in a sufficient version
startup_modules = [
'py.__.thread.io',
'py.__.thread.pool',
'py.__.execnet.inputoutput',
'py.__.execnet.gateway',
'py.__.execnet.message',
'py.__.execnet.channel',
]
def getsource(dottedname):
mod = __import__(dottedname, None, None, ['__doc__'])
return inspect.getsource(mod)
from py.__.execnet import inputoutput, gateway
class InstallableGateway(gateway.Gateway):
""" initialize gateways on both sides of a inputoutput object. """
def __init__(self, io):
self.remote_bootstrap_gateway(io)
super(InstallableGateway, self).__init__(io=io, startcount=1)
def remote_bootstrap_gateway(self, io, extra=''):
""" return Gateway with a asynchronously remotely
initialized counterpart Gateway (which may or may not succeed).
Note that the other sides gateways starts enumerating
its channels with even numbers while the sender
gateway starts with odd numbers. This allows to
uniquely identify channels across both sides.
"""
bootstrap = ["we_are_remote=True", extra]
bootstrap += [getsource(x) for x in startup_modules]
bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join(joinexec=False)",]
source = "\n".join(bootstrap)
self._trace("sending gateway bootstrap code")
io.write('%r\n' % source)
class PopenCmdGateway(InstallableGateway):
def __init__(self, cmd):
infile, outfile = os.popen2(cmd)
io = inputoutput.Popen2IO(infile, outfile)
super(PopenCmdGateway, self).__init__(io=io)
## self._pidchannel = self.remote_exec("""
## import os
## channel.send(os.getpid())
## """)
## def exit(self):
## try:
## self._pidchannel.waitclose(timeout=0.5)
## pid = self._pidchannel.receive()
## except IOError:
## self._trace("IOError: could not receive child PID:")
## self._traceex(sys.exc_info())
## pid = None
## super(PopenCmdGateway, self).exit()
## if pid is not None:
## self._trace("waiting for pid %s" % pid)
## try:
## os.waitpid(pid, 0)
## except KeyboardInterrupt:
## if sys.platform != "win32":
## os.kill(pid, 15)
## raise
## except OSError, e:
## self._trace("child process %s already dead? error:%s" %
## (pid, str(e)))
class PopenGateway(PopenCmdGateway):
# use sysfind/sysexec/subprocess instead of os.popen?
def __init__(self, python=sys.executable):
cmd = '%s -u -c "exec input()"' % python
super(PopenGateway, self).__init__(cmd)
def remote_bootstrap_gateway(self, io, extra=''):
# XXX the following hack helps us to import the same version
# of the py lib and other dependcies, but only works for
# PopenGateways because we can assume to have access to
# the same filesystem
# --> we definitely need proper remote imports working
# across any kind of gateway!
x = py.path.local(py.__file__).dirpath().dirpath()
ppath = os.environ.get('PYTHONPATH', '')
plist = [str(x)] + ppath.split(':')
s = "\n".join([extra,
"import sys ; sys.path[:0] = %r" % (plist,),
"import os ; os.environ['PYTHONPATH'] = %r" % ppath,
# redirect file descriptors 0 and 1 to /dev/null, to avoid
# complete confusion (this is independent from the sys.stdout
# and sys.stderr redirection that gateway.remote_exec() can do)
# note that we redirect fd 2 on win too, since for some reason that
# blocks there, while it works (sending to stderr if possible else
# ignoring) on *nix
str(py.code.Source("""
try:
devnull = os.devnull
except AttributeError:
if os.name == 'nt':
devnull = 'NUL'
else:
devnull = '/dev/null'
sys.stdin = os.fdopen(os.dup(0), 'rb', 0)
sys.stdout = os.fdopen(os.dup(1), 'wb', 0)
if os.name == 'nt':
sys.stderr = os.fdopen(os.dup(2), 'wb', 0)
fd = os.open(devnull, os.O_RDONLY)
os.dup2(fd, 0)
os.close(fd)
fd = os.open(devnull, os.O_WRONLY)
os.dup2(fd, 1)
if os.name == 'nt':
os.dup2(fd, 2)
os.close(fd)
""")),
""
])
super(PopenGateway, self).remote_bootstrap_gateway(io, s)
class SocketGateway(InstallableGateway):
def __init__(self, host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host = host = str(host)
self.port = port = int(port)
sock.connect((host, port))
io = inputoutput.SocketIO(sock)
InstallableGateway.__init__(self, io=io)
def _getremoteaddress(self):
return '%s:%d' % (self.host, self.port)
def remote_install(cls, gateway, hostport=None):
""" return a connected socket gateway through the
given gateway.
"""
if hostport is None:
host, port = ('', 0)
else:
host, port = hostport
socketserverbootstrap = py.code.Source(
mypath.dirpath('script', 'socketserver.py').read('rU'),
"""
import socket
sock = bind_and_listen((%r, %r))
hostname = socket.gethostname()
channel.send((hostname, sock.getsockname()))
startserver(sock)
""" % (host, port))
# execute the above socketserverbootstrap on the other side
channel = gateway.remote_exec(socketserverbootstrap)
hostname, (realhost, realport) = channel.receive()
if not hostname:
realhost = hostname
#gateway._trace("remote_install received"
# "port=%r, hostname = %r" %(realport, hostname))
return py.execnet.SocketGateway(realhost, realport)
remote_install = classmethod(remote_install)
class SshGateway(PopenCmdGateway):
def __init__(self, sshaddress, remotepython='python', identity=None):
self.sshaddress = sshaddress
remotecmd = '%s -u -c "exec input()"' % (remotepython,)
cmdline = [sshaddress, remotecmd]
# XXX Unix style quoting
for i in range(len(cmdline)):
cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
cmd = 'ssh -C'
if identity is not None:
cmd += ' -i %s' % (identity,)
cmdline.insert(0, cmd)
super(SshGateway, self).__init__(' '.join(cmdline))
def _getremoteaddress(self):
return self.sshaddress
class ExecGateway(PopenGateway):
def remote_exec_sync_stdcapture(self, lines, callback):
# hack: turn the content of the cell into
#
# if 1:
# line1
# line2
# ...
#
lines = [' ' + line for line in lines]
lines.insert(0, 'if 1:')
lines.append('')
sourcecode = '\n'.join(lines)
try:
callbacks = self.callbacks
except AttributeError:
callbacks = self.callbacks = {}
answerid = id(callback)
self.callbacks[answerid] = callback
self.exec_remote('''
import sys, StringIO
try:
execns
except:
execns = {}
oldout, olderr = sys.stdout, sys.stderr
try:
buffer = StringIO.StringIO()
sys.stdout = sys.stderr = buffer
try:
exec compile(%(sourcecode)r, 'single') in execns
except:
import traceback
traceback.print_exc()
finally:
sys.stdout=oldout
sys.stderr=olderr
# fiddle us (the caller) into executing the callback on remote answers
gateway.exec_remote(
"gateway.invoke_callback(%(answerid)r, %%r)" %% buffer.getvalue())
''' % locals())
def invoke_callback(self, answerid, value):
callback = self.callbacks[answerid]
del self.callbacks[answerid]
callback(value)

162
py/execnet/rsync.py Normal file
View File

@@ -0,0 +1,162 @@
import py, md5
def rsync(gw, sourcedir, destdir, **options):
for name in options:
assert name in ('delete',)
channel = gw.remote_exec("""
import os, stat, shutil, md5
destdir, options = channel.receive()
modifiedfiles = []
def remove(path):
assert path.startswith(destdir)
try:
os.unlink(path)
except OSError:
# assume it's a dir
shutil.rmtree(path)
def receive_directory_structure(path, relcomponents):
#print "receive directory structure", path
try:
st = os.lstat(path)
except OSError:
st = None
msg = channel.receive()
if isinstance(msg, list):
if st and not stat.S_ISDIR(st.st_mode):
os.unlink(path)
st = None
if not st:
os.mkdir(path)
entrynames = {}
for entryname in msg:
receive_directory_structure(os.path.join(path, entryname),
relcomponents + [entryname])
entrynames[entryname] = True
if options.get('delete'):
for othername in os.listdir(path):
if othername not in entrynames:
otherpath = os.path.join(path, othername)
remove(otherpath)
else:
if st and stat.S_ISREG(st.st_mode):
f = file(path, 'rb')
data = f.read()
f.close()
mychecksum = md5.md5(data).digest()
else:
if st:
remove(path)
mychecksum = None
if mychecksum != msg:
channel.send(relcomponents)
modifiedfiles.append(path)
receive_directory_structure(destdir, [])
channel.send(None) # end marker
for path in modifiedfiles:
data = channel.receive()
f = open(path, 'wb')
f.write(data)
f.close()
""")
channel.send((str(destdir), options))
def send_directory_structure(path):
if path.check(dir=1):
subpaths = path.listdir()
print "sending directory structure", path
channel.send([p.basename for p in subpaths])
for p in subpaths:
send_directory_structure(p)
elif path.check(file=1):
data = path.read()
checksum = md5.md5(data).digest()
channel.send(checksum)
else:
raise ValueError, "cannot sync %r" % (path,)
send_directory_structure(sourcedir)
while True:
modified_rel_path = channel.receive()
if modified_rel_path is None:
break
modifiedpath = sourcedir.join(*modified_rel_path)
data = modifiedpath.read()
channel.send(data)
channel.waitclose()
def copy(gw, source, dest):
channel = gw.remote_exec("""
import md5
localfilename = channel.receive()
try:
f = file(localfilename, 'rb')
existingdata = f.read()
f.close()
except (IOError, OSError):
mycrc = None
else:
mycrc = md5.md5(existingdata).digest()
remotecrc = channel.receive()
if remotecrc == mycrc:
channel.send(None)
else:
channel.send(localfilename)
newdata = channel.receive()
f = file(localfilename, 'wb')
f.write(newdata)
f.close()
""")
channel.send(str(dest))
f = file(str(source), 'rb')
localdata = f.read()
f.close()
channel.send(md5.md5(localdata).digest())
status = channel.receive()
if status is not None:
assert status == str(dest) # for now
channel.send(localdata)
channel.waitclose()
def setup_module(mod):
mod.gw = py.execnet.PopenGateway()
def teardown_module(mod):
mod.gw.exit()
def test_filecopy():
dir = py.test.ensuretemp('filecopy')
source = dir.ensure('source')
dest = dir.join('dest')
source.write('hello world')
copy(gw, source, dest)
assert dest.check(file=1)
assert dest.read() == 'hello world'
source.write('something else')
copy(gw, source, dest)
assert dest.check(file=1)
assert dest.read() == 'something else'
def test_dirsync():
base = py.test.ensuretemp('dirsync')
dest = base.join('dest')
source = base.mkdir('source')
for s in ('content1', 'content2'):
source.ensure('subdir', 'file1').write(s)
rsync(gw, source, dest)
assert dest.join('subdir').check(dir=1)
assert dest.join('subdir', 'file1').check(file=1)
assert dest.join('subdir', 'file1').read() == s
source.join('subdir').remove('file1')
rsync(gw, source, dest)
assert dest.join('subdir', 'file1').check(file=1)
rsync(gw, source, dest, delete=True)
assert not dest.join('subdir', 'file1').check()

View File

@@ -0,0 +1,16 @@
"""
send a "quit" signal to a remote server
"""
import sys
import socket
hostport = sys.argv[1]
host, port = hostport.split(':')
hostport = (host, int(port))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(hostport)
sock.sendall('"raise KeyboardInterrupt"\n')

84
py/execnet/script/shell.py Executable file
View File

@@ -0,0 +1,84 @@
#! /usr/bin/env python
"""
a remote python shell
for injection into startserver.py
"""
import sys, os, socket, select
try:
clientsock
except NameError:
print "client side starting"
import sys
host, port = sys.argv[1].split(':')
port = int(port)
myself = open(os.path.abspath(sys.argv[0]), 'rU').read()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.sendall(repr(myself)+'\n')
print "send boot string"
inputlist = [ sock, sys.stdin ]
try:
while 1:
r,w,e = select.select(inputlist, [], [])
if sys.stdin in r:
line = raw_input()
sock.sendall(line + '\n')
if sock in r:
line = sock.recv(4096)
sys.stdout.write(line)
sys.stdout.flush()
except:
import traceback
print traceback.print_exc()
sys.exit(1)
print "server side starting"
# server side
#
from traceback import print_exc
from threading import Thread
class promptagent(Thread):
def __init__(self, clientsock):
Thread.__init__(self)
self.clientsock = clientsock
def run(self):
print "Entering thread prompt loop"
clientfile = self.clientsock.makefile('w')
filein = self.clientsock.makefile('r')
loc = self.clientsock.getsockname()
while 1:
try:
clientfile.write('%s %s >>> ' % loc)
clientfile.flush()
line = filein.readline()
if len(line)==0: raise EOFError,"nothing"
#print >>sys.stderr,"got line: " + line
if line.strip():
oldout, olderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = clientfile, clientfile
try:
try:
exec compile(line + '\n','<remote pyin>', 'single')
except:
print_exc()
finally:
sys.stdout=oldout
sys.stderr=olderr
clientfile.flush()
except EOFError,e:
print >>sys.stderr, "connection close, prompt thread returns"
break
#print >>sys.stdout, "".join(apply(format_exception,sys.exc_info()))
self.clientsock.close()
prompter = promptagent(clientsock)
prompter.start()
print "promptagent - thread started"

View File

@@ -0,0 +1,87 @@
#! /usr/bin/env python
"""
start socket based minimal readline exec server
"""
# this part of the program only executes on the server side
#
progname = 'socket_readline_exec_server-1.2'
debug = 0
import sys, socket, os
try:
import fcntl
except ImportError:
fcntl = None
if debug: # and not os.isatty(sys.stdin.fileno()):
f = open('/tmp/execnet-socket-pyout.log', 'a', 0)
old = sys.stdout, sys.stderr
sys.stdout = sys.stderr = f
#import py
#compile = py.code.compile
def exec_from_one_connection(serversock):
print progname, 'Entering Accept loop', serversock.getsockname()
clientsock,address = serversock.accept()
print progname, 'got new connection from %s %s' % address
clientfile = clientsock.makefile('r+',0)
print "reading line"
# rstrip so that we can use \r\n for telnet testing
source = clientfile.readline().rstrip()
clientfile.close()
g = {'clientsock' : clientsock, 'address' : address}
source = eval(source)
if source:
co = compile(source+'\n', source, 'exec')
print progname, 'compiled source, executing'
try:
exec co in g
finally:
print progname, 'finished executing code'
# background thread might hold a reference to this (!?)
#clientsock.close()
def bind_and_listen(hostport):
if isinstance(hostport, str):
host, port = hostport.split(':')
hostport = (host, int(port))
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set close-on-exec
if hasattr(fcntl, 'FD_CLOEXEC'):
old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD)
fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
# allow the address to be re-used in a reasonable amount of time
if os.name == 'posix' and sys.platform != 'cygwin':
serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversock.bind(hostport)
serversock.listen(5)
return serversock
def startserver(serversock, loop=False):
try:
while 1:
try:
exec_from_one_connection(serversock)
except (KeyboardInterrupt, SystemExit):
raise
except:
import traceback
traceback.print_exc()
if not loop:
break
finally:
print "leaving socketserver execloop"
serversock.shutdown(2)
if __name__ == '__main__':
import sys
if len(sys.argv)>1:
hostport = sys.argv[1]
else:
hostport = ':8888'
serversock = bind_and_listen(hostport)
startserver(serversock, loop=True)

View File

@@ -0,0 +1,91 @@
"""
A windows service wrapper for the py.execnet socketserver.
To use, run:
python socketserverservice.py register
net start ExecNetSocketServer
"""
import sys
import os
import time
import win32serviceutil
import win32service
import win32event
import win32evtlogutil
import servicemanager
import threading
import socketserver
appname = 'ExecNetSocketServer'
class SocketServerService(win32serviceutil.ServiceFramework):
_svc_name_ = appname
_svc_display_name_ = "%s" % appname
_svc_deps_ = ["EventLog"]
def __init__(self, args):
# The exe-file has messages for the Event Log Viewer.
# Register the exe-file as event source.
#
# Probably it would be better if this is done at installation time,
# so that it also could be removed if the service is uninstalled.
# Unfortunately it cannot be done in the 'if __name__ == "__main__"'
# block below, because the 'frozen' exe-file does not run this code.
#
win32evtlogutil.AddSourceToRegistry(self._svc_display_name_,
servicemanager.__file__,
"Application")
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
self.WAIT_TIME = 1000 # in milliseconds
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
def SvcDoRun(self):
# Redirect stdout and stderr to prevent "IOError: [Errno 9]
# Bad file descriptor". Windows services don't have functional
# output streams.
sys.stdout = sys.stderr = open('nul', 'w')
# Write a 'started' event to the event log...
win32evtlogutil.ReportEvent(self._svc_display_name_,
servicemanager.PYS_SERVICE_STARTED,
0, # category
servicemanager.EVENTLOG_INFORMATION_TYPE,
(self._svc_name_, ''))
print "Begin: %s" % (self._svc_display_name_)
hostport = ':8888'
print 'Starting py.execnet SocketServer on %s' % hostport
serversock = socketserver.bind_and_listen(hostport)
thread = threading.Thread(target=socketserver.startserver,
args=(serversock,),
kwargs={'loop':True})
thread.setDaemon(True)
thread.start()
# wait to be stopped or self.WAIT_TIME to pass
while True:
result = win32event.WaitForSingleObject(self.hWaitStop,
self.WAIT_TIME)
if result == win32event.WAIT_OBJECT_0:
break
# write a 'stopped' event to the event log.
win32evtlogutil.ReportEvent(self._svc_display_name_,
servicemanager.PYS_SERVICE_STOPPED,
0, # category
servicemanager.EVENTLOG_INFORMATION_TYPE,
(self._svc_name_, ''))
print "End: %s" % appname
if __name__ == '__main__':
# Note that this code will not be run in the 'frozen' exe-file!!!
win32serviceutil.HandleCommandLine(SocketServerService)

9
py/execnet/script/xx.py Normal file
View File

@@ -0,0 +1,9 @@
import rlcompleter2
rlcompleter2.setup()
import register, sys
try:
hostport = sys.argv[1]
except:
hostport = ':8888'
gw = register.ServerGateway(hostport)

View File

@@ -0,0 +1 @@
#

View File

@@ -0,0 +1,469 @@
from __future__ import generators
import os, sys, time, signal
import py
from py.__.execnet import gateway
from py.__.conftest import option
mypath = py.magic.autopath()
from StringIO import StringIO
from py.__.execnet.register import startup_modules, getsource
def test_getsource_import_modules():
for dottedname in startup_modules:
yield getsource, dottedname
def test_getsource_no_colision():
seen = {}
for dottedname in startup_modules:
mod = __import__(dottedname, None, None, ['__doc__'])
for name, value in vars(mod).items():
if py.std.inspect.isclass(value):
if name in seen:
olddottedname, oldval = seen[name]
if oldval is not value:
py.test.fail("duplicate class %r in %s and %s" %
(name, dottedname, olddottedname))
seen[name] = (dottedname, value)
class TestMessage:
def test_wire_protocol(self):
for cls in gateway.Message._types.values():
one = StringIO()
cls(42, '23').writeto(one)
two = StringIO(one.getvalue())
msg = gateway.Message.readfrom(two)
assert isinstance(msg, cls)
assert msg.channelid == 42
assert msg.data == '23'
assert isinstance(repr(msg), str)
# == "<Message.%s channelid=42 '23'>" %(msg.__class__.__name__, )
class TestPureChannel:
def setup_method(self, method):
self.fac = gateway.ChannelFactory(None)
def test_factory_create(self):
chan1 = self.fac.new()
assert chan1.id == 1
chan2 = self.fac.new()
assert chan2.id == 3
def test_factory_getitem(self):
chan1 = self.fac.new()
assert self.fac._channels[chan1.id] == chan1
chan2 = self.fac.new()
assert self.fac._channels[chan2.id] == chan2
def test_channel_timeouterror(self):
channel = self.fac.new()
py.test.raises(IOError, channel.waitclose, timeout=0.01)
class PopenGatewayTestSetup:
def setup_class(cls):
cls.gw = py.execnet.PopenGateway()
## def teardown_class(cls):
## cls.gw.exit()
class BasicRemoteExecution:
def test_correct_setup(self):
for x in 'sender', 'receiver':
assert self.gw.pool.getstarted(x)
def test_repr_doesnt_crash(self):
assert isinstance(repr(self), str)
def test_correct_setup_no_py(self):
channel = self.gw.remote_exec("""
import sys
channel.send(sys.modules.keys())
""")
remotemodules = channel.receive()
assert 'py' not in remotemodules, (
"py should not be imported on remote side")
def test_remote_exec_waitclose(self):
channel = self.gw.remote_exec('pass')
channel.waitclose(timeout=1.0)
def test_remote_exec_waitclose_2(self):
channel = self.gw.remote_exec('def gccycle(): pass')
channel.waitclose(timeout=1.0)
def test_remote_exec_waitclose_noarg(self):
channel = self.gw.remote_exec('pass')
channel.waitclose()
def test_remote_exec_error_after_close(self):
channel = self.gw.remote_exec('pass')
channel.waitclose(timeout=1.0)
py.test.raises(IOError, channel.send, 0)
def test_remote_exec_channel_anonymous(self):
channel = self.gw.remote_exec('''
obj = channel.receive()
channel.send(obj)
''')
channel.send(42)
result = channel.receive()
assert result == 42
def test_channel_close_and_then_receive_error(self):
channel = self.gw.remote_exec('raise ValueError')
py.test.raises(channel.RemoteError, channel.receive)
def test_channel_finish_and_then_EOFError(self):
channel = self.gw.remote_exec('channel.send(42)')
x = channel.receive()
assert x == 42
py.test.raises(EOFError, channel.receive)
py.test.raises(EOFError, channel.receive)
py.test.raises(EOFError, channel.receive)
def test_channel_close_and_then_receive_error_multiple(self):
channel = self.gw.remote_exec('channel.send(42) ; raise ValueError')
x = channel.receive()
assert x == 42
py.test.raises(channel.RemoteError, channel.receive)
def test_channel__local_close(self):
channel = self.gw.channelfactory.new()
self.gw.channelfactory._local_close(channel.id)
channel.waitclose(0.1)
def test_channel__local_close_error(self):
channel = self.gw.channelfactory.new()
self.gw.channelfactory._local_close(channel.id,
channel.RemoteError("error"))
py.test.raises(channel.RemoteError, channel.waitclose, 0.01)
def test_channel_error_reporting(self):
channel = self.gw.remote_exec('def foo():\n return foobar()\nfoo()\n')
try:
channel.receive()
except channel.RemoteError, e:
assert str(e).startswith('Traceback (most recent call last):')
assert str(e).find('NameError: global name \'foobar\' '
'is not defined') > -1
else:
py.test.fail('No exception raised')
def test_channel_syntax_error(self):
# missing colon
channel = self.gw.remote_exec('def foo()\n return 1\nfoo()\n')
try:
channel.receive()
except channel.RemoteError, e:
assert str(e).startswith('Traceback (most recent call last):')
assert str(e).find('SyntaxError') > -1
def test_channel_iter(self):
channel = self.gw.remote_exec("""
for x in range(3):
channel.send(x)
""")
l = list(channel)
assert l == range(3)
def test_channel_passing_over_channel(self):
channel = self.gw.remote_exec('''
c = channel.gateway.newchannel()
channel.send(c)
c.send(42)
''')
c = channel.receive()
x = c.receive()
assert x == 42
# check that the both sides previous channels are really gone
channel.waitclose(0.3)
assert channel.id not in self.gw.channelfactory._channels
#assert c.id not in self.gw.channelfactory
newchan = self.gw.remote_exec('''
assert %d not in channel.gateway.channelfactory._channels
''' % (channel.id))
newchan.waitclose(0.3)
def test_channel_receiver_callback(self):
l = []
#channel = self.gw.newchannel(receiver=l.append)
channel = self.gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
channel.setcallback(callback=l.append)
py.test.raises(IOError, channel.receive)
channel.waitclose(1.0)
assert len(l) == 3
assert l[:2] == [42,13]
assert isinstance(l[2], channel.__class__)
def test_channel_callback_after_receive(self):
l = []
channel = self.gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
x = channel.receive()
assert x == 42
channel.setcallback(callback=l.append)
py.test.raises(IOError, channel.receive)
channel.waitclose(1.0)
assert len(l) == 2
assert l[0] == 13
assert isinstance(l[1], channel.__class__)
def test_waiting_for_callbacks(self):
l = []
def callback(msg):
import time; time.sleep(0.2)
l.append(msg)
channel = self.gw.remote_exec(source='''
channel.send(42)
''')
channel.setcallback(callback)
channel.waitclose(1.0)
assert l == [42]
def test_channel_callback_stays_active(self, earlyfree=True):
# with 'earlyfree==True', this tests the "sendonly" channel state.
l = []
channel = self.gw.remote_exec(source='''
import thread, time
def producer(subchannel):
for i in range(5):
time.sleep(0.15)
subchannel.send(i*100)
channel2 = channel.receive()
thread.start_new_thread(producer, (channel2,))
del channel2
''')
subchannel = self.gw.newchannel()
subchannel.setcallback(l.append)
channel.send(subchannel)
if earlyfree:
subchannel = None
counter = 100
while len(l) < 5:
if subchannel and subchannel.isclosed():
break
counter -= 1
print counter
if not counter:
py.test.fail("timed out waiting for the answer[%d]" % len(l))
time.sleep(0.04) # busy-wait
assert l == [0, 100, 200, 300, 400]
return subchannel
def test_channel_callback_remote_freed(self):
channel = self.test_channel_callback_stays_active(False)
channel.waitclose(1.0) # freed automatically at the end of producer()
def test_channel_endmarker_callback(self):
l = []
channel = self.gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
channel.setcallback(l.append, 999)
py.test.raises(IOError, channel.receive)
channel.waitclose(1.0)
assert len(l) == 4
assert l[:2] == [42,13]
assert isinstance(l[2], channel.__class__)
assert l[3] == 999
def test_remote_redirect_stdout(self):
out = py.std.StringIO.StringIO()
handle = self.gw.remote_redirect(stdout=out)
c = self.gw.remote_exec("print 42")
c.waitclose(1.0)
handle.close()
s = out.getvalue()
assert s.strip() == "42"
def test_remote_exec_redirect_multi(self):
num = 3
l = [[] for x in range(num)]
channels = [self.gw.remote_exec("print %d" % i, stdout=l[i].append)
for i in range(num)]
for x in channels:
x.waitclose(1.0)
for i in range(num):
subl = l[i]
assert subl
s = subl[0]
assert s.strip() == str(i)
def test_channel_file(self):
channel = self.gw.remote_exec("""
f = channel.makefile()
print >>f, "hello world"
f.close()
channel.send(42)
""")
first = channel.receive() + channel.receive()
assert first.strip() == 'hello world'
second = channel.receive()
assert second == 42
def test_channel_file_write_error(self):
channel = self.gw.remote_exec("pass")
f = channel.makefile()
channel.waitclose(1.0)
py.test.raises(IOError, f.write, 'hello')
def test_channel_file_proxyclose(self):
channel = self.gw.remote_exec("""
f = channel.makefile(proxyclose=True)
print >>f, "hello world"
f.close()
channel.send(42)
""")
first = channel.receive() + channel.receive()
assert first.strip() == 'hello world'
py.test.raises(EOFError, channel.receive)
def test_confusion_from_os_write_stdout(self):
channel = self.gw.remote_exec("""
import os
os.write(1, 'confusion!')
channel.send(channel.receive() * 6)
channel.send(channel.receive() * 6)
""")
channel.send(3)
res = channel.receive()
assert res == 18
channel.send(7)
res = channel.receive()
assert res == 42
def test_confusion_from_os_write_stderr(self):
channel = self.gw.remote_exec("""
import os
os.write(2, 'test')
channel.send(channel.receive() * 6)
channel.send(channel.receive() * 6)
""")
channel.send(3)
res = channel.receive()
assert res == 18
channel.send(7)
res = channel.receive()
assert res == 42
#class TestBlockingIssues:
# def test_join_blocked_execution_gateway(self):
# gateway = py.execnet.PopenGateway()
# channel = gateway.remote_exec("""
# time.sleep(5.0)
# """)
# def doit():
# gateway.exit()
# gateway.join(joinexec=True)
# return 17
#
# pool = py._thread.WorkerPool()
# reply = pool.dispatch(doit)
# x = reply.get(timeout=1.0)
# assert x == 17
class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
#disabled = True
def test_chdir_separation(self):
old = py.test.ensuretemp('chdirtest').chdir()
try:
gw = py.execnet.PopenGateway()
finally:
waschangedir = old.chdir()
c = gw.remote_exec("import os ; channel.send(os.getcwd())")
x = c.receive()
assert x == str(waschangedir)
def test_many_popen(self):
num = 4
l = []
for i in range(num):
l.append(py.execnet.PopenGateway())
channels = []
for gw in l:
channel = gw.remote_exec("""channel.send(42)""")
channels.append(channel)
## try:
## while channels:
## channel = channels.pop()
## try:
## ret = channel.receive()
## assert ret == 42
## finally:
## channel.gateway.exit()
## finally:
## for x in channels:
## x.gateway.exit()
while channels:
channel = channels.pop()
ret = channel.receive()
assert ret == 42
def disabled_test_remote_is_killed_while_sending(self):
gw = py.execnet.PopenGateway()
channel = gw.remote_exec("""
import os
import time
channel.send(os.getppid())
channel.send(os.getpid())
while 1:
channel.send('#'*1000)
time.sleep(10)
""")
parent = channel.receive()
remote = channel.receive()
assert parent == os.getpid()
time.sleep(0.5)
os.kill(remote, signal.SIGKILL)
time.sleep(1)
channel.waitclose(1.0)
py.test.raises(EOFError, channel.receive)
#channel.waitclose(1.0)
#channel.send('#')
class SocketGatewaySetup:
def setup_class(cls):
# open a gateway to a fresh child process
cls.proxygw = py.execnet.PopenGateway()
cls.gw = py.execnet.SocketGateway.remote_install(cls.proxygw)
## def teardown_class(cls):
## cls.gw.exit()
## cls.proxygw.exit()
class TestSocketGateway(SocketGatewaySetup, BasicRemoteExecution):
pass
class TestSshGateway(BasicRemoteExecution):
def setup_class(cls):
if option.sshtarget is None:
py.test.skip("no known ssh target, use -S to set one")
cls.gw = py.execnet.SshGateway(option.sshtarget)
def test_sshaddress(self):
assert self.gw.sshaddress == option.sshtarget
def test_failed_connexion(self):
gw = py.execnet.SshGateway('nowhere.codespeak.net')
try:
channel = gw.remote_exec("...")
except IOError:
pass # connexion failed already
else:
# connexion did not fail yet
py.test.raises(EOFError, channel.receive)
# now it did
py.test.raises(IOError, gw.remote_exec, "...")

View File

@@ -0,0 +1,42 @@
from StringIO import StringIO
def pickletest(mod):
f1 = StringIO()
f2 = StringIO()
pickler1 = mod.Pickler(f1)
unpickler1 = mod.Unpickler(f2)
pickler2 = mod.Pickler(f2)
unpickler2 = mod.Unpickler(f1)
#pickler1.memo = unpickler1.memo = {}
#pickler2.memo = unpickler2.memo = {}
d = {}
pickler1.dump(d)
f1.seek(0)
d_other = unpickler2.load()
# translate unpickler2 memo to pickler2
pickler2.memo = dict([(id(obj), (int(x), obj))
for x, obj in unpickler2.memo.items()])
pickler2.dump(d_other)
f2.seek(0)
unpickler1.memo = dict([(str(x), y) for x, y in pickler1.memo.values()])
d_back = unpickler1.load()
assert d is d_back
def test_cpickle():
import cPickle
pickletest(cPickle)
def test_pickle():
import pickle
pickletest(pickle)