69 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			69 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
"""
 | 
						|
    Working with multiple channels and gateways
 | 
						|
 | 
						|
"""
 | 
						|
import py
 | 
						|
from py.__.execnet.channel import RemoteError
 | 
						|
 | 
						|
NO_ENDMARKER_WANTED = object()
 | 
						|
 | 
						|
class MultiGateway:
 | 
						|
    RemoteError = RemoteError
 | 
						|
    def __init__(self, gateways):
 | 
						|
        self.gateways = gateways
 | 
						|
    def remote_exec(self, source):
 | 
						|
        channels = []
 | 
						|
        for gw in self.gateways:
 | 
						|
            channels.append(gw.remote_exec(source))
 | 
						|
        return MultiChannel(channels)
 | 
						|
    def exit(self):
 | 
						|
        for gw in self.gateways:
 | 
						|
            gw.exit()
 | 
						|
 | 
						|
class MultiChannel:
 | 
						|
    def __init__(self, channels):
 | 
						|
        self._channels = channels
 | 
						|
 | 
						|
    def send_each(self, item):
 | 
						|
        for ch in self._channels:
 | 
						|
            ch.send(item)
 | 
						|
 | 
						|
    def receive_each(self, withchannel=False):
 | 
						|
        assert not hasattr(self, '_queue')
 | 
						|
        l = []
 | 
						|
        for ch in self._channels:
 | 
						|
            obj = ch.receive()
 | 
						|
            if withchannel:
 | 
						|
                l.append((ch, obj))
 | 
						|
            else:
 | 
						|
                l.append(obj)
 | 
						|
        return l 
 | 
						|
 | 
						|
    def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED):
 | 
						|
        try:
 | 
						|
            return self._queue
 | 
						|
        except AttributeError:
 | 
						|
            self._queue = py.std.Queue.Queue()
 | 
						|
            for ch in self._channels:
 | 
						|
                def putreceived(obj, channel=ch):
 | 
						|
                    self._queue.put((channel, obj))
 | 
						|
                if endmarker is NO_ENDMARKER_WANTED:
 | 
						|
                    ch.setcallback(putreceived)
 | 
						|
                else:
 | 
						|
                    ch.setcallback(putreceived, endmarker=endmarker)
 | 
						|
            return self._queue
 | 
						|
 | 
						|
 | 
						|
    def waitclose(self):
 | 
						|
        first = None
 | 
						|
        for ch in self._channels:
 | 
						|
            try:
 | 
						|
                ch.waitclose()
 | 
						|
            except ch.RemoteError:
 | 
						|
                if first is None:
 | 
						|
                    first = py.std.sys.exc_info()
 | 
						|
        if first:
 | 
						|
            raise first[0], first[1], first[2]
 | 
						|
 | 
						|
 |