execnet cleanup/refinements: avoid creating a shell for each subprocess
* introduce HostNotFound, raised for Socket and SshGateways * factored out basic tests, cleaned up existing tests * removed sshgateway identity argument which was deprecated in 1.0 --HG-- branch : trunk
This commit is contained in:
		
							parent
							
								
									734a40eb28
								
							
						
					
					
						commit
						5cf27098cf
					
				|  | @ -165,6 +165,7 @@ initpkg(__name__, | ||||||
|     'execnet.SocketGateway'  : ('./execnet/gateway.py', 'SocketGateway'), |     'execnet.SocketGateway'  : ('./execnet/gateway.py', 'SocketGateway'), | ||||||
|     'execnet.PopenGateway'   : ('./execnet/gateway.py', 'PopenGateway'), |     'execnet.PopenGateway'   : ('./execnet/gateway.py', 'PopenGateway'), | ||||||
|     'execnet.SshGateway'     : ('./execnet/gateway.py', 'SshGateway'), |     'execnet.SshGateway'     : ('./execnet/gateway.py', 'SshGateway'), | ||||||
|  |     'execnet.HostNotFound'   : ('./execnet/gateway.py', 'HostNotFound'), | ||||||
|     'execnet.XSpec'          : ('./execnet/xspec.py', 'XSpec'), |     'execnet.XSpec'          : ('./execnet/xspec.py', 'XSpec'), | ||||||
|     'execnet.makegateway'    : ('./execnet/xspec.py', 'makegateway'), |     'execnet.makegateway'    : ('./execnet/xspec.py', 'makegateway'), | ||||||
|     'execnet.MultiGateway'   : ('./execnet/multi.py', 'MultiGateway'), |     'execnet.MultiGateway'   : ('./execnet/multi.py', 'MultiGateway'), | ||||||
|  |  | ||||||
|  | @ -195,11 +195,8 @@ channel.send(dict( | ||||||
| """ | """ | ||||||
| 
 | 
 | ||||||
| class PopenCmdGateway(InitiatingGateway): | class PopenCmdGateway(InitiatingGateway): | ||||||
|     def __init__(self, cmd): |     def __init__(self, args): | ||||||
|         # on win close_fds=True does not work, not sure it'd needed |         self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE)  | ||||||
|         #p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True) |  | ||||||
|         self._popen = p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE)  |  | ||||||
|         self._cmd = cmd |  | ||||||
|         io = Popen2IO(p.stdin, p.stdout) |         io = Popen2IO(p.stdin, p.stdout) | ||||||
|         super(PopenCmdGateway, self).__init__(io=io) |         super(PopenCmdGateway, self).__init__(io=io) | ||||||
| 
 | 
 | ||||||
|  | @ -207,6 +204,7 @@ class PopenCmdGateway(InitiatingGateway): | ||||||
|         super(PopenCmdGateway, self).exit() |         super(PopenCmdGateway, self).exit() | ||||||
|         self._popen.poll() |         self._popen.poll() | ||||||
| 
 | 
 | ||||||
|  | popen_bootstrapline = "import sys ; exec(eval(sys.stdin.readline()))" | ||||||
| class PopenGateway(PopenCmdGateway): | class PopenGateway(PopenCmdGateway): | ||||||
|     """ This Gateway provides interaction with a newly started |     """ This Gateway provides interaction with a newly started | ||||||
|         python subprocess.  |         python subprocess.  | ||||||
|  | @ -217,9 +215,8 @@ class PopenGateway(PopenCmdGateway): | ||||||
|         """ |         """ | ||||||
|         if not python: |         if not python: | ||||||
|             python = sys.executable |             python = sys.executable | ||||||
|         cmd = ('%s -u -c "import sys ; ' |         args = [str(python), '-c', popen_bootstrapline] | ||||||
|                'exec(eval(sys.stdin.readline()))"' % python) |         super(PopenGateway, self).__init__(args) | ||||||
|         super(PopenGateway, self).__init__(cmd) |  | ||||||
| 
 | 
 | ||||||
|     def _remote_bootstrap_gateway(self, io, extra=''): |     def _remote_bootstrap_gateway(self, io, extra=''): | ||||||
|         # have the subprocess use the same PYTHONPATH and py lib  |         # have the subprocess use the same PYTHONPATH and py lib  | ||||||
|  | @ -250,7 +247,10 @@ class SocketGateway(InitiatingGateway): | ||||||
|         self.port = port = int(port) |         self.port = port = int(port) | ||||||
|         self.remoteaddress = '%s:%d' % (self.host, self.port) |         self.remoteaddress = '%s:%d' % (self.host, self.port) | ||||||
|         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||||
|  |         try: | ||||||
|             sock.connect((host, port)) |             sock.connect((host, port)) | ||||||
|  |         except socket.gaierror: | ||||||
|  |             raise HostNotFound(str(sys.exc_info()[1])) | ||||||
|         io = SocketIO(sock) |         io = SocketIO(sock) | ||||||
|         super(SocketGateway, self).__init__(io=io) |         super(SocketGateway, self).__init__(io=io) | ||||||
| 
 | 
 | ||||||
|  | @ -280,36 +280,29 @@ class SocketGateway(InitiatingGateway): | ||||||
|         return py.execnet.SocketGateway(host, realport)  |         return py.execnet.SocketGateway(host, realport)  | ||||||
|     new_remote = classmethod(new_remote) |     new_remote = classmethod(new_remote) | ||||||
| 
 | 
 | ||||||
|  | class HostNotFound(Exception): | ||||||
|  |     pass | ||||||
|      |      | ||||||
| class SshGateway(PopenCmdGateway): | class SshGateway(PopenCmdGateway): | ||||||
|     """ This Gateway provides interaction with a remote Python process, |     """ This Gateway provides interaction with a remote Python process, | ||||||
|         established via the 'ssh' command line binary.   |         established via the 'ssh' command line binary.   | ||||||
|         The remote side needs to have a Python interpreter executable.  |         The remote side needs to have a Python interpreter executable.  | ||||||
|     """ |     """ | ||||||
|     def __init__(self, sshaddress, remotepython=None,  | 
 | ||||||
|         identity=None, ssh_config=None):  |     def __init__(self, sshaddress, remotepython=None, ssh_config=None):  | ||||||
|         """ instantiate a remote ssh process with the  |         """ instantiate a remote ssh process with the  | ||||||
|             given 'sshaddress' and remotepython version. |             given 'sshaddress' and remotepython version. | ||||||
|             you may specify an ssh_config file.  |             you may specify an ssh_config file.  | ||||||
|             DEPRECATED: you may specify an 'identity' filepath.  |  | ||||||
|         """ |         """ | ||||||
|         self.remoteaddress = sshaddress |         self.remoteaddress = sshaddress | ||||||
|         if remotepython is None: |         if remotepython is None: | ||||||
|             remotepython = "python" |             remotepython = "python" | ||||||
|         remotecmd = '%s -u -c "exec input()"' % (remotepython,) |         args = ['ssh', '-C' ] | ||||||
|         cmdline = [sshaddress, remotecmd] |  | ||||||
|         # XXX Unix style quoting |  | ||||||
|         for i in range(len(cmdline)): |  | ||||||
|             cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'" |  | ||||||
|         cmd = 'ssh -C' |  | ||||||
|         if identity is not None:  |  | ||||||
|             py.log._apiwarn("1.0", "pass in 'ssh_config' file instead of identity") |  | ||||||
|             cmd += ' -i %s' % (identity,) |  | ||||||
|         if ssh_config is not None: |         if ssh_config is not None: | ||||||
|             cmd += ' -F %s' % (ssh_config) |             args.extend(['-F', str(ssh_config)]) | ||||||
|         cmdline.insert(0, cmd)  |         remotecmd = '%s -c "%s"' %(remotepython, popen_bootstrapline) | ||||||
|         cmd = ' '.join(cmdline) |         args.extend([sshaddress, remotecmd]) | ||||||
|         super(SshGateway, self).__init__(cmd) |         super(SshGateway, self).__init__(args) | ||||||
|         |         | ||||||
|     def _remote_bootstrap_gateway(self, io, s=""):  |     def _remote_bootstrap_gateway(self, io, s=""):  | ||||||
|         extra = "\n".join([ |         extra = "\n".join([ | ||||||
|  | @ -317,8 +310,12 @@ class SshGateway(PopenCmdGateway): | ||||||
|             "stdouterrin_setnull()", |             "stdouterrin_setnull()", | ||||||
|             s,  |             s,  | ||||||
|         ]) |         ]) | ||||||
|  |         try: | ||||||
|             super(SshGateway, self)._remote_bootstrap_gateway(io, extra) |             super(SshGateway, self)._remote_bootstrap_gateway(io, extra) | ||||||
| 
 |         except EOFError: | ||||||
|  |             ret = self._popen.wait() | ||||||
|  |             if ret == 255: | ||||||
|  |                 raise HostNotFound(self.remoteaddress) | ||||||
| 
 | 
 | ||||||
| def stdouterrin_setnull(): | def stdouterrin_setnull(): | ||||||
|     """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null.  |     """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null.  | ||||||
|  |  | ||||||
|  | @ -0,0 +1,215 @@ | ||||||
|  | 
 | ||||||
|  | import py | ||||||
|  | import sys, os, subprocess, inspect | ||||||
|  | from py.__.execnet import gateway_base, gateway | ||||||
|  | from py.__.execnet.gateway_base import Message, Channel, ChannelFactory | ||||||
|  | 
 | ||||||
|  | def test_subprocess_interaction(anypython): | ||||||
|  |     line = gateway.popen_bootstrapline | ||||||
|  |     compile(line, 'xyz', 'exec') | ||||||
|  |     args = [str(anypython), '-c', line] | ||||||
|  |     popen = subprocess.Popen(args, bufsize=0, stderr=subprocess.STDOUT, | ||||||
|  |                              stdin=subprocess.PIPE, stdout=subprocess.PIPE) | ||||||
|  |     def send(line): | ||||||
|  |         popen.stdin.write(line.encode('ascii')) | ||||||
|  |         if sys.version_info > (3,0): # 3k still buffers  | ||||||
|  |             popen.stdin.flush() | ||||||
|  |     def receive(): | ||||||
|  |         return popen.stdout.readline().decode('ascii') | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         source = py.code.Source(read_write_loop, "read_write_loop()") | ||||||
|  |         repr_source = repr(str(source)) + "\n" | ||||||
|  |         sendline = repr_source | ||||||
|  |         send(sendline)     | ||||||
|  |         s = receive() | ||||||
|  |         assert s == "ok\n" | ||||||
|  |         send("hello\n") | ||||||
|  |         s = receive() | ||||||
|  |         assert s == "received: hello\n" | ||||||
|  |         send("world\n") | ||||||
|  |         s = receive() | ||||||
|  |         assert s == "received: world\n" | ||||||
|  |     finally: | ||||||
|  |         popen.stdin.close() | ||||||
|  |         popen.stdout.close() | ||||||
|  |         popen.wait() | ||||||
|  | 
 | ||||||
|  | def read_write_loop(): | ||||||
|  |     import os, sys | ||||||
|  |     sys.stdout.write("ok\n") | ||||||
|  |     sys.stdout.flush() | ||||||
|  |     while 1: | ||||||
|  |         try: | ||||||
|  |             line = sys.stdin.readline() | ||||||
|  |             sys.stdout.write("received: %s" % line) | ||||||
|  |             sys.stdout.flush() | ||||||
|  |         except (IOError, EOFError): | ||||||
|  |             break | ||||||
|  | 
 | ||||||
|  | def pytest_generate_tests(metafunc): | ||||||
|  |     if 'anypython' in metafunc.funcargnames: | ||||||
|  |         for name in 'python3.1', 'python2.4', 'python2.5', 'python2.6': | ||||||
|  |             metafunc.addcall(id=name, param=name) | ||||||
|  | 
 | ||||||
|  | def pytest_funcarg__anypython(request): | ||||||
|  |     name = request.param   | ||||||
|  |     executable = py.path.local.sysfind(name) | ||||||
|  |     if executable is None: | ||||||
|  |         py.test.skip("no %s found" % (name,)) | ||||||
|  |     return executable | ||||||
|  | 
 | ||||||
|  | def test_io_message(anypython, tmpdir): | ||||||
|  |     check = tmpdir.join("check.py") | ||||||
|  |     check.write(py.code.Source(gateway_base, """ | ||||||
|  |         try: | ||||||
|  |             from io import BytesIO  | ||||||
|  |         except ImportError: | ||||||
|  |             from StringIO import StringIO as BytesIO | ||||||
|  |         import tempfile | ||||||
|  |         temp_out = BytesIO() | ||||||
|  |         temp_in = BytesIO() | ||||||
|  |         io = Popen2IO(temp_out, temp_in) | ||||||
|  |         for i, msg_cls in Message._types.items(): | ||||||
|  |             print ("checking %s %s" %(i, msg_cls)) | ||||||
|  |             for data in "hello", "hello".encode('ascii'): | ||||||
|  |                 msg1 = msg_cls(i, data) | ||||||
|  |                 msg1.writeto(io) | ||||||
|  |                 x = io.outfile.getvalue() | ||||||
|  |                 io.outfile.truncate(0) | ||||||
|  |                 io.outfile.seek(0) | ||||||
|  |                 io.infile.seek(0) | ||||||
|  |                 io.infile.write(x) | ||||||
|  |                 io.infile.seek(0) | ||||||
|  |                 msg2 = Message.readfrom(io) | ||||||
|  |                 assert msg1.channelid == msg2.channelid, (msg1, msg2) | ||||||
|  |                 assert msg1.data == msg2.data | ||||||
|  |         print ("all passed") | ||||||
|  |     """)) | ||||||
|  |     #out = py.process.cmdexec("%s %s" %(executable,check)) | ||||||
|  |     out = anypython.sysexec(check) | ||||||
|  |     print (out) | ||||||
|  |     assert "all passed" in out | ||||||
|  | 
 | ||||||
|  | def test_popen_io(anypython, tmpdir): | ||||||
|  |     check = tmpdir.join("check.py") | ||||||
|  |     check.write(py.code.Source(gateway_base, """ | ||||||
|  |         do_exec(Popen2IO.server_stmt, globals()) | ||||||
|  |         io.write("hello".encode('ascii')) | ||||||
|  |         s = io.read(1) | ||||||
|  |         assert s == "x".encode('ascii') | ||||||
|  |     """)) | ||||||
|  |     from subprocess import Popen, PIPE | ||||||
|  |     args = [str(anypython), str(check)] | ||||||
|  |     proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) | ||||||
|  |     proc.stdin.write("x".encode('ascii')) | ||||||
|  |     stdout, stderr = proc.communicate() | ||||||
|  |     print (stderr) | ||||||
|  |     ret = proc.wait() | ||||||
|  |     assert "hello".encode('ascii') in stdout | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_rinfo_source(anypython, tmpdir): | ||||||
|  |     check = tmpdir.join("check.py") | ||||||
|  |     check.write(py.code.Source(""" | ||||||
|  |         class Channel: | ||||||
|  |             def send(self, data): | ||||||
|  |                 assert eval(repr(data), {}) == data | ||||||
|  |         channel = Channel() | ||||||
|  |         """, gateway.rinfo_source, """ | ||||||
|  |         print ('all passed') | ||||||
|  |     """)) | ||||||
|  |     out = anypython.sysexec(check) | ||||||
|  |     print (out) | ||||||
|  |     assert "all passed" in out | ||||||
|  | 
 | ||||||
|  | def test_geterrortext(anypython, tmpdir): | ||||||
|  |     check = tmpdir.join("check.py") | ||||||
|  |     check.write(py.code.Source(gateway_base, """ | ||||||
|  |         class Arg: | ||||||
|  |             pass | ||||||
|  |         errortext = geterrortext((Arg, "1", 4)) | ||||||
|  |         assert "Arg" in errortext | ||||||
|  |         import sys | ||||||
|  |         try: | ||||||
|  |             raise ValueError("17") | ||||||
|  |         except ValueError: | ||||||
|  |             excinfo = sys.exc_info() | ||||||
|  |             s = geterrortext(excinfo) | ||||||
|  |             assert "17" in s | ||||||
|  |             print ("all passed") | ||||||
|  |     """)) | ||||||
|  |     out = anypython.sysexec(check) | ||||||
|  |     print (out) | ||||||
|  |     assert "all passed" in out | ||||||
|  | 
 | ||||||
|  | def test_getsource_import_modules():  | ||||||
|  |     for dottedname in gateway.startup_modules:  | ||||||
|  |         yield gateway.getsource, dottedname  | ||||||
|  | 
 | ||||||
|  | def test_getsource_no_colision():  | ||||||
|  |     seen = {} | ||||||
|  |     for dottedname in gateway.startup_modules:  | ||||||
|  |         mod = __import__(dottedname, None, None, ['__doc__']) | ||||||
|  |         for name, value in vars(mod).items():  | ||||||
|  |             if py.std.inspect.isclass(value):  | ||||||
|  |                 if name in seen:  | ||||||
|  |                     olddottedname, oldval = seen[name] | ||||||
|  |                     if oldval is not value:  | ||||||
|  |                         py.test.fail("duplicate class %r in %s and %s" %  | ||||||
|  |                                      (name, dottedname, olddottedname))  | ||||||
|  |                 seen[name] = (dottedname, value)  | ||||||
|  | 
 | ||||||
|  | def test_stdouterrin_setnull(): | ||||||
|  |     cap = py.io.StdCaptureFD() | ||||||
|  |     from py.__.execnet.gateway import stdouterrin_setnull | ||||||
|  |     stdouterrin_setnull() | ||||||
|  |     import os | ||||||
|  |     os.write(1, "hello".encode('ascii')) | ||||||
|  |     if os.name == "nt": | ||||||
|  |         os.write(2, "world") | ||||||
|  |     os.read(0, 1) | ||||||
|  |     out, err = cap.reset() | ||||||
|  |     assert not out | ||||||
|  |     assert not err | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class TestMessage: | ||||||
|  |     def test_wire_protocol(self): | ||||||
|  |         for cls in Message._types.values(): | ||||||
|  |             one = py.io.BytesIO() | ||||||
|  |             data = '23'.encode('ascii') | ||||||
|  |             cls(42, data).writeto(one) | ||||||
|  |             two = py.io.BytesIO(one.getvalue()) | ||||||
|  |             msg = Message.readfrom(two) | ||||||
|  |             assert isinstance(msg, cls) | ||||||
|  |             assert msg.channelid == 42 | ||||||
|  |             assert msg.data == data | ||||||
|  |             assert isinstance(repr(msg), str) | ||||||
|  |             # == "<Message.%s channelid=42 '23'>" %(msg.__class__.__name__, ) | ||||||
|  | 
 | ||||||
|  | class TestPureChannel: | ||||||
|  |     def setup_method(self, method): | ||||||
|  |         self.fac = ChannelFactory(None) | ||||||
|  | 
 | ||||||
|  |     def test_factory_create(self): | ||||||
|  |         chan1 = self.fac.new() | ||||||
|  |         assert chan1.id == 1 | ||||||
|  |         chan2 = self.fac.new() | ||||||
|  |         assert chan2.id == 3 | ||||||
|  | 
 | ||||||
|  |     def test_factory_getitem(self): | ||||||
|  |         chan1 = self.fac.new() | ||||||
|  |         assert self.fac._channels[chan1.id] == chan1 | ||||||
|  |         chan2 = self.fac.new() | ||||||
|  |         assert self.fac._channels[chan2.id] == chan2 | ||||||
|  | 
 | ||||||
|  |     def test_channel_timeouterror(self): | ||||||
|  |         channel = self.fac.new() | ||||||
|  |         py.test.raises(IOError, channel.waitclose, timeout=0.01) | ||||||
|  | 
 | ||||||
|  |     def test_channel_makefile_incompatmode(self): | ||||||
|  |         channel = self.fac.new() | ||||||
|  |         py.test.raises(ValueError, 'channel.makefile("rw")') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @ -1,191 +1,13 @@ | ||||||
| from __future__ import generators | from __future__ import generators | ||||||
| import os, sys, time, signal | import os, sys, time | ||||||
| import py | import py | ||||||
| from py.__.execnet.gateway_base import Message, Channel, ChannelFactory |  | ||||||
| from py.__.execnet.gateway_base import ExecnetAPI, queue, Popen2IO |  | ||||||
| from py.__.execnet import gateway_base, gateway | from py.__.execnet import gateway_base, gateway | ||||||
|  | queue = py.builtin._tryimport('queue', 'Queue') | ||||||
| 
 | 
 | ||||||
| from py.__.execnet.gateway import startup_modules, getsource  |  | ||||||
| pytest_plugins = "pytester" | pytest_plugins = "pytester" | ||||||
| 
 | 
 | ||||||
| TESTTIMEOUT = 10.0 # seconds | TESTTIMEOUT = 10.0 # seconds | ||||||
| 
 | 
 | ||||||
| def pytest_generate_tests(metafunc): |  | ||||||
|     if 'pythonpath' in metafunc.funcargnames: |  | ||||||
|         for name in 'python2.4', 'python2.5', 'python2.6', 'python3.1': |  | ||||||
|             metafunc.addcall(id=name, param=name) |  | ||||||
| 
 |  | ||||||
| def pytest_funcarg__pythonpath(request): |  | ||||||
|     name = request.param   |  | ||||||
|     executable = py.path.local.sysfind(name) |  | ||||||
|     if executable is None: |  | ||||||
|         py.test.skip("no %s found" % (name,)) |  | ||||||
|     return executable |  | ||||||
| 
 |  | ||||||
| def test_io_message(pythonpath, tmpdir): |  | ||||||
|     check = tmpdir.join("check.py") |  | ||||||
|     check.write(py.code.Source(gateway_base, """ |  | ||||||
|         try: |  | ||||||
|             from io import BytesIO  |  | ||||||
|         except ImportError: |  | ||||||
|             from StringIO import StringIO as BytesIO |  | ||||||
|         import tempfile |  | ||||||
|         temp_out = BytesIO() |  | ||||||
|         temp_in = BytesIO() |  | ||||||
|         io = Popen2IO(temp_out, temp_in) |  | ||||||
|         for i, msg_cls in Message._types.items(): |  | ||||||
|             print ("checking %s %s" %(i, msg_cls)) |  | ||||||
|             for data in "hello", "hello".encode('ascii'): |  | ||||||
|                 msg1 = msg_cls(i, data) |  | ||||||
|                 msg1.writeto(io) |  | ||||||
|                 x = io.outfile.getvalue() |  | ||||||
|                 io.outfile.truncate(0) |  | ||||||
|                 io.outfile.seek(0) |  | ||||||
|                 io.infile.seek(0) |  | ||||||
|                 io.infile.write(x) |  | ||||||
|                 io.infile.seek(0) |  | ||||||
|                 msg2 = Message.readfrom(io) |  | ||||||
|                 assert msg1.channelid == msg2.channelid, (msg1, msg2) |  | ||||||
|                 assert msg1.data == msg2.data |  | ||||||
|         print ("all passed") |  | ||||||
|     """)) |  | ||||||
|     #out = py.process.cmdexec("%s %s" %(executable,check)) |  | ||||||
|     out = pythonpath.sysexec(check) |  | ||||||
|     print (out) |  | ||||||
|     assert "all passed" in out |  | ||||||
| 
 |  | ||||||
| def test_popen_io(pythonpath, tmpdir): |  | ||||||
|     check = tmpdir.join("check.py") |  | ||||||
|     check.write(py.code.Source(gateway_base, """ |  | ||||||
|         do_exec(Popen2IO.server_stmt, globals()) |  | ||||||
|         io.write("hello".encode('ascii')) |  | ||||||
|         s = io.read(1) |  | ||||||
|         assert s == "x".encode('ascii') |  | ||||||
|     """)) |  | ||||||
|     from subprocess import Popen, PIPE |  | ||||||
|     args = [str(pythonpath), str(check)] |  | ||||||
|     proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) |  | ||||||
|     proc.stdin.write("x".encode('ascii')) |  | ||||||
|     stdout, stderr = proc.communicate() |  | ||||||
|     print (stderr) |  | ||||||
|     ret = proc.wait() |  | ||||||
|     assert "hello".encode('ascii') in stdout |  | ||||||
| 
 |  | ||||||
| def test_rinfo_source(pythonpath, tmpdir): |  | ||||||
|     check = tmpdir.join("check.py") |  | ||||||
|     check.write(py.code.Source(""" |  | ||||||
|         class Channel: |  | ||||||
|             def send(self, data): |  | ||||||
|                 assert eval(repr(data), {}) == data |  | ||||||
|         channel = Channel() |  | ||||||
|         """, gateway.rinfo_source, """ |  | ||||||
|         print ('all passed') |  | ||||||
|     """)) |  | ||||||
|     out = pythonpath.sysexec(check) |  | ||||||
|     print (out) |  | ||||||
|     assert "all passed" in out |  | ||||||
| 
 |  | ||||||
| def test_geterrortext(pythonpath, tmpdir): |  | ||||||
|     check = tmpdir.join("check.py") |  | ||||||
|     check.write(py.code.Source(gateway_base, """ |  | ||||||
|         class Arg: |  | ||||||
|             pass |  | ||||||
|         errortext = geterrortext((Arg, "1", 4)) |  | ||||||
|         assert "Arg" in errortext |  | ||||||
|         import sys |  | ||||||
|         try: |  | ||||||
|             raise ValueError("17") |  | ||||||
|         except ValueError: |  | ||||||
|             excinfo = sys.exc_info() |  | ||||||
|             s = geterrortext(excinfo) |  | ||||||
|             assert "17" in s |  | ||||||
|             print ("all passed") |  | ||||||
|     """)) |  | ||||||
|     out = pythonpath.sysexec(check) |  | ||||||
|     print (out) |  | ||||||
|     assert "all passed" in out |  | ||||||
| 
 |  | ||||||
| class TestExecnetEvents: |  | ||||||
|     def test_popengateway(self, _pytest): |  | ||||||
|         rec = _pytest.gethookrecorder(ExecnetAPI) |  | ||||||
|         gw = py.execnet.PopenGateway() |  | ||||||
|         call = rec.popcall("pyexecnet_gateway_init")  |  | ||||||
|         assert call.gateway == gw |  | ||||||
|         gw.exit() |  | ||||||
|         call = rec.popcall("pyexecnet_gateway_exit") |  | ||||||
|         assert call.gateway == gw |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def test_getsource_import_modules():  |  | ||||||
|     for dottedname in startup_modules:  |  | ||||||
|         yield getsource, dottedname  |  | ||||||
| 
 |  | ||||||
| def test_getsource_no_colision():  |  | ||||||
|     seen = {} |  | ||||||
|     for dottedname in startup_modules:  |  | ||||||
|         mod = __import__(dottedname, None, None, ['__doc__']) |  | ||||||
|         for name, value in vars(mod).items():  |  | ||||||
|             if py.std.inspect.isclass(value):  |  | ||||||
|                 if name in seen:  |  | ||||||
|                     olddottedname, oldval = seen[name] |  | ||||||
|                     if oldval is not value:  |  | ||||||
|                         py.test.fail("duplicate class %r in %s and %s" %  |  | ||||||
|                                      (name, dottedname, olddottedname))  |  | ||||||
|                 seen[name] = (dottedname, value)  |  | ||||||
| 
 |  | ||||||
| def test_stdouterrin_setnull(): |  | ||||||
|     cap = py.io.StdCaptureFD() |  | ||||||
|     from py.__.execnet.gateway import stdouterrin_setnull |  | ||||||
|     stdouterrin_setnull() |  | ||||||
|     import os |  | ||||||
|     os.write(1, "hello".encode('ascii')) |  | ||||||
|     if os.name == "nt": |  | ||||||
|         os.write(2, "world") |  | ||||||
|     os.read(0, 1) |  | ||||||
|     out, err = cap.reset() |  | ||||||
|     assert not out |  | ||||||
|     assert not err |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class TestMessage: |  | ||||||
|     def test_wire_protocol(self): |  | ||||||
|         for cls in Message._types.values(): |  | ||||||
|             one = py.io.BytesIO() |  | ||||||
|             data = '23'.encode('ascii') |  | ||||||
|             cls(42, data).writeto(one) |  | ||||||
|             two = py.io.BytesIO(one.getvalue()) |  | ||||||
|             msg = Message.readfrom(two) |  | ||||||
|             assert isinstance(msg, cls) |  | ||||||
|             assert msg.channelid == 42 |  | ||||||
|             assert msg.data == data |  | ||||||
|             assert isinstance(repr(msg), str) |  | ||||||
|             # == "<Message.%s channelid=42 '23'>" %(msg.__class__.__name__, ) |  | ||||||
| 
 |  | ||||||
| class TestPureChannel: |  | ||||||
|     def setup_method(self, method): |  | ||||||
|         self.fac = ChannelFactory(None) |  | ||||||
| 
 |  | ||||||
|     def test_factory_create(self): |  | ||||||
|         chan1 = self.fac.new() |  | ||||||
|         assert chan1.id == 1 |  | ||||||
|         chan2 = self.fac.new() |  | ||||||
|         assert chan2.id == 3 |  | ||||||
| 
 |  | ||||||
|     def test_factory_getitem(self): |  | ||||||
|         chan1 = self.fac.new() |  | ||||||
|         assert self.fac._channels[chan1.id] == chan1 |  | ||||||
|         chan2 = self.fac.new() |  | ||||||
|         assert self.fac._channels[chan2.id] == chan2 |  | ||||||
| 
 |  | ||||||
|     def test_channel_timeouterror(self): |  | ||||||
|         channel = self.fac.new() |  | ||||||
|         py.test.raises(IOError, channel.waitclose, timeout=0.01) |  | ||||||
| 
 |  | ||||||
|     def test_channel_makefile_incompatmode(self): |  | ||||||
|         channel = self.fac.new() |  | ||||||
|         py.test.raises(ValueError, 'channel.makefile("rw")') |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class PopenGatewayTestSetup: | class PopenGatewayTestSetup: | ||||||
|     def setup_class(cls): |     def setup_class(cls): | ||||||
|         cls.gw = py.execnet.PopenGateway() |         cls.gw = py.execnet.PopenGateway() | ||||||
|  | @ -648,6 +470,7 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): | ||||||
|         py.test.raises(EOFError, channel.send, None) |         py.test.raises(EOFError, channel.send, None) | ||||||
|         py.test.raises(EOFError, channel.receive) |         py.test.raises(EOFError, channel.receive) | ||||||
| 
 | 
 | ||||||
|  | @py.test.mark.xfail | ||||||
| def test_endmarker_delivery_on_remote_killterm(): | def test_endmarker_delivery_on_remote_killterm(): | ||||||
|     if not hasattr(py.std.os, 'kill'): |     if not hasattr(py.std.os, 'kill'): | ||||||
|         py.test.skip("no os.kill()") |         py.test.skip("no os.kill()") | ||||||
|  | @ -664,8 +487,8 @@ def test_endmarker_delivery_on_remote_killterm(): | ||||||
|         err = channel._getremoteerror() |         err = channel._getremoteerror() | ||||||
|     finally: |     finally: | ||||||
|         gw.exit() |         gw.exit() | ||||||
|     py.test.skip("provide information on causes/signals " |     assert "killed" in str(err) | ||||||
|                  "of dying remote gateways") |     assert "15" in str(err) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class SocketGatewaySetup: | class SocketGatewaySetup: | ||||||
|  | @ -675,6 +498,10 @@ class SocketGatewaySetup: | ||||||
|         cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw, |         cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw, | ||||||
|                                                      ("127.0.0.1", 0) |                                                      ("127.0.0.1", 0) | ||||||
|                                                      )  |                                                      )  | ||||||
|  |     def test_host_not_found(self): | ||||||
|  |         py.test.raises(py.execnet.HostNotFound,  | ||||||
|  |                 'py.execnet.SocketGateway("qowieuqowe", 9000)' | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
| ##    def teardown_class(cls): | ##    def teardown_class(cls): | ||||||
| ##        cls.gw.exit() | ##        cls.gw.exit() | ||||||
|  | @ -695,27 +522,16 @@ class TestSshGateway(BasicRemoteExecution): | ||||||
|             "Host alias123\n" |             "Host alias123\n" | ||||||
|             "   HostName %s\n" % self.sshhost) |             "   HostName %s\n" % self.sshhost) | ||||||
|         gw = py.execnet.SshGateway("alias123", ssh_config=ssh_config) |         gw = py.execnet.SshGateway("alias123", ssh_config=ssh_config) | ||||||
|         assert gw._cmd.find("-F") != -1 |  | ||||||
|         assert gw._cmd.find(str(ssh_config)) != -1 |  | ||||||
|         pid = gw.remote_exec("import os ; channel.send(os.getpid())").receive() |         pid = gw.remote_exec("import os ; channel.send(os.getpid())").receive() | ||||||
|         gw.exit() |         gw.exit() | ||||||
| 
 | 
 | ||||||
|     def test_sshaddress(self): |     def test_sshaddress(self): | ||||||
|         assert self.gw.remoteaddress == self.sshhost |         assert self.gw.remoteaddress == self.sshhost | ||||||
| 
 | 
 | ||||||
|     @py.test.mark.xfail # XXX ssh-gateway error handling |  | ||||||
|     def test_connexion_failes_on_non_existing_hosts(self): |     def test_connexion_failes_on_non_existing_hosts(self): | ||||||
|         py.test.raises(IOError,  |         py.test.raises(py.execnet.HostNotFound,  | ||||||
|             "py.execnet.SshGateway('nowhere.codespeak.net')") |             "py.execnet.SshGateway('nowhere.codespeak.net')") | ||||||
| 
 | 
 | ||||||
|     @py.test.mark.xfail # "XXX ssh-gateway error handling" |  | ||||||
|     def test_deprecated_identity(self): |  | ||||||
|         py.test.deprecated_call( |  | ||||||
|             py.test.raises, IOError,  |  | ||||||
|                 py.execnet.SshGateway, |  | ||||||
|                     'nowhere.codespeak.net', identity='qwe') |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def test_threads(): | def test_threads(): | ||||||
|     gw = py.execnet.PopenGateway() |     gw = py.execnet.PopenGateway() | ||||||
|     gw.remote_init_threads(3) |     gw.remote_init_threads(3) | ||||||
|  | @ -735,25 +551,16 @@ def test_threads_twice(): | ||||||
|     py.test.raises(IOError, gw.remote_init_threads, 3) |     py.test.raises(IOError, gw.remote_init_threads, 3) | ||||||
|     gw.exit()  |     gw.exit()  | ||||||
| 
 | 
 | ||||||
|  | class TestExecnetEvents: | ||||||
|  |     def test_popengateway(self, _pytest): | ||||||
|  |         rec = _pytest.gethookrecorder(gateway_base.ExecnetAPI) | ||||||
|  |         gw = py.execnet.PopenGateway() | ||||||
|  |         call = rec.popcall("pyexecnet_gateway_init")  | ||||||
|  |         assert call.gateway == gw | ||||||
|  |         gw.exit() | ||||||
|  |         call = rec.popcall("pyexecnet_gateway_exit") | ||||||
|  |         assert call.gateway == gw | ||||||
| 
 | 
 | ||||||
| def test_nodebug(): | def test_nodebug(): | ||||||
|     from py.__.execnet import gateway_base |     from py.__.execnet import gateway_base | ||||||
|     assert not gateway_base.debug |     assert not gateway_base.debug | ||||||
| 
 |  | ||||||
| def test_channel_endmarker_remote_killterm(): |  | ||||||
|     gw = py.execnet.PopenGateway() |  | ||||||
|     try: |  | ||||||
|         q = queue.Queue() |  | ||||||
|         channel = gw.remote_exec(''' |  | ||||||
|             import os |  | ||||||
|             os.kill(os.getpid(), 15) |  | ||||||
|         ''')  |  | ||||||
|         channel.setcallback(q.put, endmarker=999) |  | ||||||
|         val = q.get(TESTTIMEOUT) |  | ||||||
|         assert val == 999 |  | ||||||
|         err = channel._getremoteerror() |  | ||||||
|     finally: |  | ||||||
|         gw.exit() |  | ||||||
|     py.test.skip("provide information on causes/signals " |  | ||||||
|                  "of dying remote gateways") |  | ||||||
| 
 |  | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue