322 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			322 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
import threading, weakref, sys
 | 
						|
import Queue
 | 
						|
if 'Message' not in globals():
 | 
						|
    from py.__.execnet.message import Message
 | 
						|
 | 
						|
class RemoteError(EOFError):
 | 
						|
    """ Contains an Exceptions from the other side. """
 | 
						|
    def __init__(self, formatted):
 | 
						|
        self.formatted = formatted
 | 
						|
        EOFError.__init__(self)
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        return self.formatted
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return "%s: %s" %(self.__class__.__name__, self.formatted)
 | 
						|
 | 
						|
    def warn(self):
 | 
						|
        # XXX do this better
 | 
						|
        print >> sys.stderr, "Warning: unhandled %r" % (self,)
 | 
						|
 | 
						|
NO_ENDMARKER_WANTED = object()
 | 
						|
 | 
						|
 | 
						|
class Channel(object):
 | 
						|
    """Communication channel between two possibly remote threads of code. """
 | 
						|
    RemoteError = RemoteError
 | 
						|
 | 
						|
    def __init__(self, gateway, id):
 | 
						|
        assert isinstance(id, int)
 | 
						|
        self.gateway = gateway
 | 
						|
        self.id = id
 | 
						|
        self._items = Queue.Queue()
 | 
						|
        self._closed = False
 | 
						|
        self._receiveclosed = threading.Event()
 | 
						|
        self._remoteerrors = []
 | 
						|
 | 
						|
    def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
 | 
						|
        queue = self._items
 | 
						|
        lock = self.gateway._channelfactory._receivelock
 | 
						|
        lock.acquire()
 | 
						|
        try:
 | 
						|
            _callbacks = self.gateway._channelfactory._callbacks
 | 
						|
            dictvalue = (callback, endmarker)
 | 
						|
            if _callbacks.setdefault(self.id, dictvalue) != dictvalue:
 | 
						|
                raise IOError("%r has callback already registered" %(self,))
 | 
						|
            self._items = None
 | 
						|
            while 1:
 | 
						|
                try:
 | 
						|
                    olditem = queue.get(block=False)
 | 
						|
                except Queue.Empty:
 | 
						|
                    break
 | 
						|
                else:
 | 
						|
                    if olditem is ENDMARKER:
 | 
						|
                        queue.put(olditem)
 | 
						|
                        break
 | 
						|
                    else:
 | 
						|
                        callback(olditem)
 | 
						|
            if self._closed or self._receiveclosed.isSet():
 | 
						|
                # no need to keep a callback
 | 
						|
                self.gateway._channelfactory._close_callback(self.id)
 | 
						|
        finally:
 | 
						|
            lock.release()
 | 
						|
         
 | 
						|
    def __repr__(self):
 | 
						|
        flag = self.isclosed() and "closed" or "open"
 | 
						|
        return "<Channel id=%d %s>" % (self.id, flag)
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        if self.gateway is None:   # can be None in tests
 | 
						|
            return
 | 
						|
        self.gateway._trace("Channel(%d).__del__" % self.id)
 | 
						|
        # no multithreading issues here, because we have the last ref to 'self'
 | 
						|
        if self._closed:
 | 
						|
            # state transition "closed" --> "deleted"
 | 
						|
            for error in self._remoteerrors:
 | 
						|
                error.warn()
 | 
						|
        elif self._receiveclosed.isSet():
 | 
						|
            # state transition "sendonly" --> "deleted"
 | 
						|
            # the remote channel is already in "deleted" state, nothing to do
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            # state transition "opened" --> "deleted"
 | 
						|
            if self._items is None:    # has_callback
 | 
						|
                Msg = Message.CHANNEL_LAST_MESSAGE
 | 
						|
            else:
 | 
						|
                Msg = Message.CHANNEL_CLOSE
 | 
						|
            self.gateway._outgoing.put(Msg(self.id))
 | 
						|
 | 
						|
    def _getremoteerror(self):
 | 
						|
        try:
 | 
						|
            return self._remoteerrors.pop(0)
 | 
						|
        except IndexError:
 | 
						|
            return None
 | 
						|
 | 
						|
    #
 | 
						|
    # public API for channel objects 
 | 
						|
    #
 | 
						|
    def isclosed(self):
 | 
						|
        """ return True if the channel is closed. A closed 
 | 
						|
            channel may still hold items. 
 | 
						|
        """ 
 | 
						|
        return self._closed
 | 
						|
 | 
						|
    def makefile(self, mode='w', proxyclose=False):
 | 
						|
        """ return a file-like object.  Only supported mode right
 | 
						|
            now is 'w' for binary writes.  If you want to have
 | 
						|
            a subsequent file.close() mean to close the channel
 | 
						|
            as well, then pass proxyclose=True. 
 | 
						|
        """ 
 | 
						|
        assert mode == 'w', "mode %r not availabe" %(mode,)
 | 
						|
        return ChannelFile(channel=self, proxyclose=proxyclose)
 | 
						|
 | 
						|
    def close(self, error=None):
 | 
						|
        """ close down this channel on both sides. """
 | 
						|
        if not self._closed:
 | 
						|
            # state transition "opened/sendonly" --> "closed"
 | 
						|
            # threads warning: the channel might be closed under our feet,
 | 
						|
            # but it's never damaging to send too many CHANNEL_CLOSE messages
 | 
						|
            put = self.gateway._outgoing.put
 | 
						|
            if error is not None:
 | 
						|
                put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
 | 
						|
            else:
 | 
						|
                put(Message.CHANNEL_CLOSE(self.id))
 | 
						|
            if isinstance(error, RemoteError):
 | 
						|
                self._remoteerrors.append(error)
 | 
						|
            self._closed = True         # --> "closed"
 | 
						|
            self._receiveclosed.set()
 | 
						|
            queue = self._items
 | 
						|
            if queue is not None:
 | 
						|
                queue.put(ENDMARKER)
 | 
						|
            self.gateway._channelfactory._no_longer_opened(self.id)
 | 
						|
 | 
						|
    def waitclose(self, timeout=None):
 | 
						|
        """ wait until this channel is closed (or the remote side
 | 
						|
        otherwise signalled that no more data was being sent).
 | 
						|
        The channel may still hold receiveable items, but not receive
 | 
						|
        more.  waitclose() reraises exceptions from executing code on
 | 
						|
        the other side as channel.RemoteErrors containing a a textual
 | 
						|
        representation of the remote traceback.
 | 
						|
        """
 | 
						|
        self._receiveclosed.wait(timeout=timeout)  # wait for non-"opened" state
 | 
						|
        if not self._receiveclosed.isSet():
 | 
						|
            raise IOError, "Timeout"
 | 
						|
        error = self._getremoteerror()
 | 
						|
        if error:
 | 
						|
            raise error
 | 
						|
 | 
						|
    def send(self, item):
 | 
						|
        """sends the given item to the other side of the channel,
 | 
						|
        possibly blocking if the sender queue is full.
 | 
						|
        Note that an item needs to be marshallable.
 | 
						|
        """
 | 
						|
        if self.isclosed(): 
 | 
						|
            raise IOError, "cannot send to %r" %(self,) 
 | 
						|
        if isinstance(item, Channel):
 | 
						|
            data = Message.CHANNEL_NEW(self.id, item.id)
 | 
						|
        else:
 | 
						|
            data = Message.CHANNEL_DATA(self.id, item)
 | 
						|
        self.gateway._outgoing.put(data)
 | 
						|
 | 
						|
    def receive(self):
 | 
						|
        """receives an item that was sent from the other side,
 | 
						|
        possibly blocking if there is none.
 | 
						|
        Note that exceptions from the other side will be
 | 
						|
        reraised as channel.RemoteError exceptions containing
 | 
						|
        a textual representation of the remote traceback.
 | 
						|
        """
 | 
						|
        queue = self._items
 | 
						|
        if queue is None:
 | 
						|
            raise IOError("calling receive() on channel with receiver callback")
 | 
						|
        x = queue.get()
 | 
						|
        if x is ENDMARKER: 
 | 
						|
            queue.put(x)  # for other receivers 
 | 
						|
            raise self._getremoteerror() or EOFError()
 | 
						|
        else: 
 | 
						|
            return x
 | 
						|
    
 | 
						|
    def __iter__(self):
 | 
						|
        return self 
 | 
						|
 | 
						|
    def next(self): 
 | 
						|
        try:
 | 
						|
            return self.receive()
 | 
						|
        except EOFError: 
 | 
						|
            raise StopIteration 
 | 
						|
 | 
						|
#
 | 
						|
# helpers
 | 
						|
#
 | 
						|
 | 
						|
ENDMARKER = object() 
 | 
						|
 | 
						|
class ChannelFactory(object):
 | 
						|
    RemoteError = RemoteError
 | 
						|
 | 
						|
    def __init__(self, gateway, startcount=1):
 | 
						|
        self._channels = weakref.WeakValueDictionary()
 | 
						|
        self._callbacks = {}
 | 
						|
        self._writelock = threading.Lock()
 | 
						|
        self._receivelock = threading.RLock()
 | 
						|
        self.gateway = gateway
 | 
						|
        self.count = startcount
 | 
						|
        self.finished = False
 | 
						|
 | 
						|
    def new(self, id=None):
 | 
						|
        """ create a new Channel with 'id' (or create new id if None). """
 | 
						|
        self._writelock.acquire()
 | 
						|
        try:
 | 
						|
            if self.finished:
 | 
						|
                raise IOError("connexion already closed: %s" % (self.gateway,))
 | 
						|
            if id is None:
 | 
						|
                id = self.count
 | 
						|
                self.count += 2
 | 
						|
            channel = Channel(self.gateway, id)
 | 
						|
            self._channels[id] = channel
 | 
						|
            return channel
 | 
						|
        finally:
 | 
						|
            self._writelock.release()
 | 
						|
 | 
						|
    def channels(self):
 | 
						|
        return self._channels.values()
 | 
						|
 | 
						|
    #
 | 
						|
    # internal methods, called from the receiver thread 
 | 
						|
    #
 | 
						|
    def _no_longer_opened(self, id):
 | 
						|
        try:
 | 
						|
            del self._channels[id]
 | 
						|
        except KeyError:
 | 
						|
            pass
 | 
						|
        self._close_callback(id)
 | 
						|
 | 
						|
    def _close_callback(self, id):
 | 
						|
        try:
 | 
						|
            callback, endmarker = self._callbacks.pop(id)
 | 
						|
        except KeyError:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            if endmarker is not NO_ENDMARKER_WANTED:
 | 
						|
                callback(endmarker)
 | 
						|
 | 
						|
    def _local_close(self, id, remoteerror=None):
 | 
						|
        channel = self._channels.get(id)
 | 
						|
        if channel is None:
 | 
						|
            # channel already in "deleted" state
 | 
						|
            if remoteerror:
 | 
						|
                remoteerror.warn()
 | 
						|
        else:
 | 
						|
            # state transition to "closed" state
 | 
						|
            if remoteerror:
 | 
						|
                channel._remoteerrors.append(remoteerror)
 | 
						|
            channel._closed = True          # --> "closed"
 | 
						|
            channel._receiveclosed.set()
 | 
						|
            queue = channel._items
 | 
						|
            if queue is not None:
 | 
						|
                queue.put(ENDMARKER)
 | 
						|
        self._no_longer_opened(id)
 | 
						|
 | 
						|
    def _local_last_message(self, id):
 | 
						|
        channel = self._channels.get(id)
 | 
						|
        if channel is None:
 | 
						|
            # channel already in "deleted" state
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            # state transition: if "opened", change to "sendonly"
 | 
						|
            channel._receiveclosed.set()
 | 
						|
            queue = channel._items
 | 
						|
            if queue is not None:
 | 
						|
                queue.put(ENDMARKER)
 | 
						|
        self._no_longer_opened(id)
 | 
						|
 | 
						|
    def _local_receive(self, id, data): 
 | 
						|
        # executes in receiver thread
 | 
						|
        self._receivelock.acquire()
 | 
						|
        try:
 | 
						|
            try:
 | 
						|
                callback, endmarker = self._callbacks[id]
 | 
						|
            except KeyError:
 | 
						|
                channel = self._channels.get(id)
 | 
						|
                queue = channel and channel._items
 | 
						|
                if queue is None:
 | 
						|
                    pass    # drop data
 | 
						|
                else:
 | 
						|
                    queue.put(data)
 | 
						|
            else:
 | 
						|
                callback(data)   # even if channel may be already closed
 | 
						|
        finally:
 | 
						|
            self._receivelock.release()
 | 
						|
 | 
						|
    def _finished_receiving(self):
 | 
						|
        self._writelock.acquire()
 | 
						|
        try:
 | 
						|
            self.finished = True
 | 
						|
        finally:
 | 
						|
            self._writelock.release()
 | 
						|
        for id in self._channels.keys():
 | 
						|
            self._local_last_message(id)
 | 
						|
        for id in self._callbacks.keys():
 | 
						|
            self._close_callback(id)
 | 
						|
 | 
						|
 | 
						|
class ChannelFile:
 | 
						|
    def __init__(self, channel, proxyclose=True):
 | 
						|
        self.channel = channel
 | 
						|
        self._proxyclose = proxyclose 
 | 
						|
 | 
						|
    def write(self, out):
 | 
						|
        self.channel.send(out)
 | 
						|
 | 
						|
    def flush(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        if self._proxyclose: 
 | 
						|
            self.channel.close()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        state = self.channel.isclosed() and 'closed' or 'open'
 | 
						|
        return '<ChannelFile %d %s>' %(self.channel.id, state) 
 | 
						|
 |