59 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			59 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
""" 
 | 
						|
    tests for 
 | 
						|
    - multi channels and multi gateways 
 | 
						|
 | 
						|
"""
 | 
						|
 | 
						|
import py
 | 
						|
 | 
						|
class TestMultiChannelAndGateway:
 | 
						|
    def test_multichannel_receive_each(self):
 | 
						|
        class pseudochannel:
 | 
						|
            def receive(self):
 | 
						|
                return 12
 | 
						|
 | 
						|
        pc1 = pseudochannel()
 | 
						|
        pc2 = pseudochannel()
 | 
						|
        multichannel = py.execnet.MultiChannel([pc1, pc2])
 | 
						|
        l = multichannel.receive_each(withchannel=True)
 | 
						|
        assert len(l) == 2
 | 
						|
        assert l == [(pc1, 12), (pc2, 12)]
 | 
						|
        l = multichannel.receive_each(withchannel=False)
 | 
						|
        assert l == [12,12]
 | 
						|
 | 
						|
    def test_multichannel_send_each(self):
 | 
						|
        l = [py.execnet.PopenGateway() for x in range(2)]
 | 
						|
        gm = py.execnet.MultiGateway(l)
 | 
						|
        mc = gm.remote_exec("""
 | 
						|
            import os
 | 
						|
            channel.send(channel.receive() + 1)
 | 
						|
        """)
 | 
						|
        mc.send_each(41)
 | 
						|
        l = mc.receive_each() 
 | 
						|
        assert l == [42,42]
 | 
						|
       
 | 
						|
    def test_multichannel_receive_queue_for_two_subprocesses(self):
 | 
						|
        l = [py.execnet.PopenGateway() for x in range(2)]
 | 
						|
        gm = py.execnet.MultiGateway(l)
 | 
						|
        mc = gm.remote_exec("""
 | 
						|
            import os
 | 
						|
            channel.send(os.getpid())
 | 
						|
        """)
 | 
						|
        queue = mc.make_receive_queue()
 | 
						|
        ch, item = queue.get(timeout=10)
 | 
						|
        ch2, item2 = queue.get(timeout=10)
 | 
						|
        assert ch != ch2
 | 
						|
        assert ch.gateway != ch2.gateway
 | 
						|
        assert item != item2
 | 
						|
        mc.waitclose()
 | 
						|
 | 
						|
    def test_multichannel_waitclose(self):
 | 
						|
        l = []
 | 
						|
        class pseudochannel:
 | 
						|
            def waitclose(self):
 | 
						|
                l.append(0)
 | 
						|
        multichannel = py.execnet.MultiChannel([pseudochannel(), pseudochannel()])
 | 
						|
        multichannel.waitclose()
 | 
						|
        assert len(l) == 2
 | 
						|
 |