diff --git a/py/__init__.py b/py/__init__.py index 6ea490bdb..4ed60afb9 100644 --- a/py/__init__.py +++ b/py/__init__.py @@ -27,8 +27,8 @@ version = "1.0.0a1" initpkg(__name__, description = "pylib and py.test: agile development and test support library", - revision = int('$LastChangedRevision: 62252 $'.split(':')[1][:-1]), - lastchangedate = '$LastChangedDate: 2009-02-27 20:56:51 +0100 (Fri, 27 Feb 2009) $', + revision = int('$LastChangedRevision: 63027 $'.split(':')[1][:-1]), + lastchangedate = '$LastChangedDate: 2009-03-18 12:18:39 +0100 (Wed, 18 Mar 2009) $', version = version, url = "http://pylib.org", download_url = "http://codespeak.net/py/0.9.2/download.html", @@ -151,6 +151,9 @@ initpkg(__name__, 'execnet.SocketGateway' : ('./execnet/register.py', 'SocketGateway'), 'execnet.PopenGateway' : ('./execnet/register.py', 'PopenGateway'), 'execnet.SshGateway' : ('./execnet/register.py', 'SshGateway'), + 'execnet.GatewaySpec' : ('./execnet/gwmanage.py', 'GatewaySpec'), + 'execnet.MultiGateway' : ('./execnet/gwmanage.py', 'MultiGateway'), + 'execnet.MultiChannel' : ('./execnet/gwmanage.py', 'MultiChannel'), # execnet scripts 'execnet.RSync' : ('./execnet/rsync.py', 'RSync'), diff --git a/py/doc/execnet.txt b/py/doc/execnet.txt index 626f98b5b..76ab622b5 100644 --- a/py/doc/execnet.txt +++ b/py/doc/execnet.txt @@ -38,18 +38,11 @@ Basic Features With ''py.execnet'' you get the means -- to navigate through the network with Process, Thread, SSH - and Socket- gateways that allow you ... - -- to distribute your program across a network and define - communication protocols from the client side, making - server maintenance superflous. In fact, there is no such - thing as a server. It's just another computer ... if it - doesn't run in a kernel-level jail [#]_ in which case - even that is virtualized. +- to execute python code fragements in remote processes and +- to interchange data between asynchronously executing code fragments -Available Gateways/Connection methods +Available Gateways ----------------------------------------- You may use one of the following connection methods: @@ -68,10 +61,11 @@ You may use one of the following connection methods: script. You can run this "server script" without having the py lib installed on that remote system. -Remote execution approach + +executing code remotely ------------------------------------- -All gateways offer one main high level function: +All gateways offer remote code execution via this high level function: def remote_exec(source): """return channel object for communicating with the asynchronously @@ -94,29 +88,20 @@ an example: >>> remote_pid != py.std.os.getpid() True -`remote_exec` implements the idea to ``determine -protocol and remote code from the client/local side``. -This makes distributing a program run in an ad-hoc -manner (using e.g. :api:`py.execnet.SshGateway`) very easy. - -You should not need to maintain software on the other sides -you are running your code at, other than the Python -executable itself. - .. _`Channel`: .. _`channel-api`: .. _`exchange data`: -The **Channel** interface for exchanging data across gateways +Bidirectionally exchange data between hosts ------------------------------------------------------------- -While executing custom strings on "the other side" is simple enough -it is often tricky to deal with. Therefore we want a way -to send data items to and fro between the distributedly running -program. The idea is to inject a Channel object for each -execution of source code. This Channel object allows two -program parts to send data to each other. -Here is the current interface:: +A channel object allows to send and receive data between +two asynchronously running programs. When calling +`remote_exec` you will get a channel object back and +the code fragement running on the other side will +see a channel object in its global namespace. + +Here is the interface of channel objects:: # # API for sending and receiving anonymous values @@ -125,7 +110,7 @@ Here is the current interface:: sends the given item to the other side of the channel, possibly blocking if the sender queue is full. Note that items need to be marshallable (all basic - python types are): + python types are). channel.receive(): receives an item that was sent from the other side, @@ -146,25 +131,93 @@ Here is the current interface:: A remote side blocking on receive() on this channel will get woken up and see an EOFError exception. +Instantiating a gateway from a string-specification +--------------------------------------------------------- -The complete Fileserver example -........................................ +To specify Gateways with a String:: + >>> import py + >>> gwspec = py.execnet.GatewaySpec("popen") + >>> gw = gwspec.makegateway() + >>> ex = gw.remote_exec("import sys ; channel.send(sys.executable)").receive() + >>> assert ex == py.std.sys.executable + >>> + +current gateway types and specifications ++++++++++++++++++++++++++++++++++++++++++++++++ + ++------------------------+-------------------------------------------+ +| ssh host | ssh:host:pythonexecutable:path | ++------------------------+-------------------------------------------+ +| local subprocess | popen:python_executable:path | ++------------------------+-------------------------------------------+ +| remote socket process | socket:host:port:python_executable:path | ++------------------------+-------------------------------------------+ + +examples of valid specifications +++++++++++++++++++++++++++++++++++++++ + +``ssh:wyvern:python2.4`` signifies a connection to a Python process on the machine reached via "ssh wyvern", current dir will be the 'pyexecnet-cache' subdirectory. + +``socket:192.168.1.4`` signifies a connection to a Python Socket server process to the given IP on the default port 8888; current dir will be the 'pyexecnet-cache' directory. + +``popen:python2.5`` signifies a connection to a python2.5 subprocess; current dir will be the current dir of the instantiator. + +``popen::pytest-cache1`` signifies a connection to a subprocess using ``sys.executable``; current dir will be the `pytest-cache1`. + + +Examples for execnet usage +------------------------------------------- + +Example: compare cwd() of Popen Gateways +++++++++++++++++++++++++++++++++++++++++ + +A PopenGateway has the same working directory as the instantiatior:: + + >>> import py, os + >>> gw = py.execnet.PopenGateway() + >>> ch = gw.remote_exec("import os; channel.send(os.getcwd())") + >>> res = ch.receive() + >>> assert res == os.getcwd() + >>> gw.exit() + +Example: multichannels +++++++++++++++++++++++++++++++++++++++++ + +MultiChannels manage 1-n execution and communication: + + >>> import py + >>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)") + >>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)") + >>> mch = py.execnet.MultiChannel([ch1, ch2]) + >>> l = mch.receive_each() + >>> assert len(l) == 2 + >>> assert 1 in l + >>> assert 2 in l + +MultiGateways help with sending code to multiple remote places: + + >>> import py + >>> mgw = py.execnet.MultiGateway([py.execnet.PopenGateway() for x in range(2)]) + >>> mch = mgw.remote_exec("import os; channel.send(os.getcwd())") + >>> res = mch.receive_each() + >>> assert res == [os.getcwd()] * 2, res + >>> mgw.exit() + + +Example: receiving file contents from remote places +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ problem: retrieving contents of remote files:: import py - contentserverbootstrap = py.code.Source( - """ + # open a gateway to a fresh child process + gw = py.execnet.SshGateway('codespeak.net') + channel = gw.remote_exec(""" for fn in channel: f = open(fn, 'rb') - try: - channel.send(f.read()) - finally: - f.close() - """) - # open a gateway to a fresh child process - contentgateway = py.execnet.SshGateway('codespeak.net') - channel = contentgateway.remote_exec(contentserverbootstrap) + channel.send(f.read()) + f.close() + """) for fn in somefilelist: channel.send(fn) @@ -218,7 +271,4 @@ Here are 20 lines of code making the above triangle happen:: gw = py.execnet.SocketGateway('localhost', cls.port) print "initialized socket gateway to port", cls.port -.. [#] There is an interesting emerging `Jail`_ linux technology - as well as a host of others, of course. -.. _`Jail`: http://books.rsbac.org/unstable/x2223.html diff --git a/py/execnet/gwmanage.py b/py/execnet/gwmanage.py index cb4f9e3dd..1e87c808d 100644 --- a/py/execnet/gwmanage.py +++ b/py/execnet/gwmanage.py @@ -140,6 +140,9 @@ class MultiGateway: for gw in self.gateways: channels.append(gw.remote_exec(source)) return MultiChannel(channels) + def exit(self): + for gw in self.gateways: + gw.exit() class GatewayManager: RemoteError = RemoteError diff --git a/py/execnet/testing/test_gwmanage.py b/py/execnet/testing/test_gwmanage.py index 1de940062..a864c3263 100644 --- a/py/execnet/testing/test_gwmanage.py +++ b/py/execnet/testing/test_gwmanage.py @@ -8,103 +8,7 @@ """ import py -from py.__.execnet.gwmanage import GatewaySpec, GatewayManager -from py.__.execnet.gwmanage import HostRSync - -class TestGatewaySpec: - """ - socket:hostname:port:path SocketGateway - popen[-executable][:path] PopenGateway - [ssh:]spec:path SshGateway - * [SshGateway] - """ - def test_popen(self): - for python in ('', 'python2.4'): - for joinpath in ('', 'abc', 'ab:cd', '/x/y'): - s = ":".join(["popen", python, joinpath]) - print s - spec = GatewaySpec(s) - assert spec.address == "popen" - assert spec.python == python - assert spec.joinpath == joinpath - assert spec.type == "popen" - spec2 = GatewaySpec("popen" + joinpath) - self._equality(spec, spec2) - - def test_ssh(self): - for prefix in ('ssh', ''): # ssh is default - for hostpart in ('x.y', 'xyz@x.y'): - for python in ('python', 'python2.5'): - for joinpath in ('', 'abc', 'ab:cd', '/tmp'): - specstring = ":".join([prefix, hostpart, python, joinpath]) - if specstring[0] == ":": - specstring = specstring[1:] - print specstring - spec = GatewaySpec(specstring) - assert spec.address == hostpart - assert spec.python == python - if joinpath: - assert spec.joinpath == joinpath - else: - assert spec.joinpath == "pyexecnetcache" - assert spec.type == "ssh" - spec2 = GatewaySpec(specstring) - self._equality(spec, spec2) - - def test_socket(self): - for hostpart in ('x.y', 'x', 'popen'): - for port in ":80", ":1000": - for joinpath in ('', ':abc', ':abc:de'): - spec = GatewaySpec("socket:" + hostpart + port + joinpath) - assert spec.address == (hostpart, int(port[1:])) - if joinpath[1:]: - assert spec.joinpath == joinpath[1:] - else: - assert spec.joinpath == "pyexecnetcache" - assert spec.type == "socket" - spec2 = GatewaySpec("socket:" + hostpart + port + joinpath) - self._equality(spec, spec2) - - def _equality(self, spec1, spec2): - assert spec1 != spec2 - assert hash(spec1) != hash(spec2) - assert not (spec1 == spec2) - - -class TestGatewaySpecAPI: - def test_popen_nopath_makegateway(self, testdir): - spec = GatewaySpec("popen") - gw = spec.makegateway() - p = gw.remote_exec("import os; channel.send(os.getcwd())").receive() - curdir = py.std.os.getcwd() - assert curdir == p - gw.exit() - - def test_popen_makegateway(self, testdir): - spec = GatewaySpec("popen::" + str(testdir.tmpdir)) - gw = spec.makegateway() - p = gw.remote_exec("import os; channel.send(os.getcwd())").receive() - assert spec.joinpath == p - gw.exit() - - def test_popen_makegateway_python(self, testdir): - spec = GatewaySpec("popen:%s" % py.std.sys.executable) - gw = spec.makegateway() - res = gw.remote_exec("import sys ; channel.send(sys.executable)").receive() - assert py.std.sys.executable == py.std.sys.executable - gw.exit() - - def test_ssh(self): - sshhost = py.test.config.getvalueorskip("sshhost") - spec = GatewaySpec("ssh:" + sshhost) - gw = spec.makegateway() - p = gw.remote_exec("import os ; channel.send(os.getcwd())").receive() - gw.exit() - - @py.test.mark.xfail("implement socketserver test scenario") - def test_socketgateway(self): - gw = py.execnet.PopenGateway() - spec = GatewaySpec("ssh:" + sshhost) +from py.__.execnet.gwmanage import GatewayManager, HostRSync class TestGatewayManagerPopen: def test_hostmanager_popen_makegateway(self): @@ -191,56 +95,6 @@ class TestGatewayManagerPopen: assert l[0].startswith(curwd) assert l[0].endswith("hello") -from py.__.execnet.gwmanage import MultiChannel -class TestMultiChannel: - def test_multichannel_receive_each(self): - class pseudochannel: - def receive(self): - return 12 - - pc1 = pseudochannel() - pc2 = pseudochannel() - multichannel = MultiChannel([pc1, pc2]) - l = multichannel.receive_each(withchannel=True) - assert len(l) == 2 - assert l == [(pc1, 12), (pc2, 12)] - l = multichannel.receive_each(withchannel=False) - assert l == [12,12] - - def test_multichannel_send_each(self): - gm = GatewayManager(['popen'] * 2) - mc = gm.multi_exec(""" - import os - channel.send(channel.receive() + 1) - """) - mc.send_each(41) - l = mc.receive_each() - assert l == [42,42] - - def test_multichannel_receive_queue(self): - gm = GatewayManager(['popen'] * 2) - mc = gm.multi_exec(""" - import os - channel.send(os.getpid()) - """) - queue = mc.make_receive_queue() - ch, item = queue.get(timeout=10) - ch2, item2 = queue.get(timeout=10) - assert ch != ch2 - assert ch.gateway != ch2.gateway - assert item != item2 - mc.waitclose() - - def test_multichannel_waitclose(self): - l = [] - class pseudochannel: - def waitclose(self): - l.append(0) - multichannel = MultiChannel([pseudochannel(), pseudochannel()]) - multichannel.waitclose() - assert len(l) == 2 - - def pytest_pyfuncarg_source(pyfuncitem): return py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("source") def pytest_pyfuncarg_dest(pyfuncitem): @@ -262,7 +116,7 @@ class TestHRSync: assert 'somedir' in basenames def test_hrsync_one_host(self, source, dest): - spec = GatewaySpec("popen::%s" % dest) + spec = py.execnet.GatewaySpec("popen::%s" % dest) gw = spec.makegateway() finished = [] rsync = HostRSync(source) diff --git a/py/execnet/testing/test_gwspec.py b/py/execnet/testing/test_gwspec.py new file mode 100644 index 000000000..5ebabbcaf --- /dev/null +++ b/py/execnet/testing/test_gwspec.py @@ -0,0 +1,101 @@ +""" + tests for py.execnet.GatewaySpec +""" + +import py + +class TestGatewaySpec: + """ + socket:hostname:port:path SocketGateway + popen[-executable][:path] PopenGateway + [ssh:]spec:path SshGateway + * [SshGateway] + """ + def test_popen(self): + for python in ('', 'python2.4'): + for joinpath in ('', 'abc', 'ab:cd', '/x/y'): + s = ":".join(["popen", python, joinpath]) + print s + spec = py.execnet.GatewaySpec(s) + assert spec.address == "popen" + assert spec.python == python + assert spec.joinpath == joinpath + assert spec.type == "popen" + spec2 = py.execnet.GatewaySpec("popen" + joinpath) + self._equality(spec, spec2) + + def test_ssh(self): + for prefix in ('ssh', ''): # ssh is default + for hostpart in ('x.y', 'xyz@x.y'): + for python in ('python', 'python2.5'): + for joinpath in ('', 'abc', 'ab:cd', '/tmp'): + specstring = ":".join([prefix, hostpart, python, joinpath]) + if specstring[0] == ":": + specstring = specstring[1:] + print specstring + spec = py.execnet.GatewaySpec(specstring) + assert spec.address == hostpart + assert spec.python == python + if joinpath: + assert spec.joinpath == joinpath + else: + assert spec.joinpath == "pyexecnetcache" + assert spec.type == "ssh" + spec2 = py.execnet.GatewaySpec(specstring) + self._equality(spec, spec2) + + def test_socket(self): + for hostpart in ('x.y', 'x', 'popen'): + for port in ":80", ":1000": + for joinpath in ('', ':abc', ':abc:de'): + spec = py.execnet.GatewaySpec("socket:" + hostpart + port + joinpath) + assert spec.address == (hostpart, int(port[1:])) + if joinpath[1:]: + assert spec.joinpath == joinpath[1:] + else: + assert spec.joinpath == "pyexecnetcache" + assert spec.type == "socket" + spec2 = py.execnet.GatewaySpec("socket:" + hostpart + port + joinpath) + self._equality(spec, spec2) + + def _equality(self, spec1, spec2): + assert spec1 != spec2 + assert hash(spec1) != hash(spec2) + assert not (spec1 == spec2) + + +class TestGatewaySpecAPI: + def test_popen_nopath_makegateway(self, testdir): + spec = py.execnet.GatewaySpec("popen") + gw = spec.makegateway() + p = gw.remote_exec("import os; channel.send(os.getcwd())").receive() + curdir = py.std.os.getcwd() + assert curdir == p + gw.exit() + + def test_popen_makegateway(self, testdir): + spec = py.execnet.GatewaySpec("popen::" + str(testdir.tmpdir)) + gw = spec.makegateway() + p = gw.remote_exec("import os; channel.send(os.getcwd())").receive() + assert spec.joinpath == p + gw.exit() + + def test_popen_makegateway_python(self, testdir): + spec = py.execnet.GatewaySpec("popen:%s" % py.std.sys.executable) + gw = spec.makegateway() + res = gw.remote_exec("import sys ; channel.send(sys.executable)").receive() + assert py.std.sys.executable == py.std.sys.executable + gw.exit() + + def test_ssh(self): + sshhost = py.test.config.getvalueorskip("sshhost") + spec = py.execnet.GatewaySpec("ssh:" + sshhost) + gw = spec.makegateway() + p = gw.remote_exec("import os ; channel.send(os.getcwd())").receive() + gw.exit() + + @py.test.mark.xfail("implement socketserver test scenario") + def test_socketgateway(self): + gw = py.execnet.PopenGateway() + spec = py.execnet.GatewaySpec("ssh:" + sshhost) + diff --git a/py/execnet/testing/test_multi.py b/py/execnet/testing/test_multi.py new file mode 100644 index 000000000..d35dd1e86 --- /dev/null +++ b/py/execnet/testing/test_multi.py @@ -0,0 +1,58 @@ +""" + tests for + - multi channels and multi gateways + +""" + +import py + +class TestMultiChannelAndGateway: + def test_multichannel_receive_each(self): + class pseudochannel: + def receive(self): + return 12 + + pc1 = pseudochannel() + pc2 = pseudochannel() + multichannel = py.execnet.MultiChannel([pc1, pc2]) + l = multichannel.receive_each(withchannel=True) + assert len(l) == 2 + assert l == [(pc1, 12), (pc2, 12)] + l = multichannel.receive_each(withchannel=False) + assert l == [12,12] + + def test_multichannel_send_each(self): + l = [py.execnet.PopenGateway() for x in range(2)] + gm = py.execnet.MultiGateway(l) + mc = gm.remote_exec(""" + import os + channel.send(channel.receive() + 1) + """) + mc.send_each(41) + l = mc.receive_each() + assert l == [42,42] + + def test_multichannel_receive_queue_for_two_subprocesses(self): + l = [py.execnet.PopenGateway() for x in range(2)] + gm = py.execnet.MultiGateway(l) + mc = gm.remote_exec(""" + import os + channel.send(os.getpid()) + """) + queue = mc.make_receive_queue() + ch, item = queue.get(timeout=10) + ch2, item2 = queue.get(timeout=10) + assert ch != ch2 + assert ch.gateway != ch2.gateway + assert item != item2 + mc.waitclose() + + def test_multichannel_waitclose(self): + l = [] + class pseudochannel: + def waitclose(self): + l.append(0) + multichannel = py.execnet.MultiChannel([pseudochannel(), pseudochannel()]) + multichannel.waitclose() + assert len(l) == 2 +