[svn r63869] * moving execnet events to become api plugin calls.

* defining Execnet hooks in an explicit API

--HG--
branch : trunk
This commit is contained in:
hpk
2009-04-08 19:50:14 +02:00
parent 92a06c3787
commit 66c64e6b97
8 changed files with 79 additions and 70 deletions

View File

@@ -20,16 +20,15 @@ class TestGatewayManagerPopen:
for spec in GatewayManager(l, defaultchdir="abc").specs:
assert spec.chdir == "abc"
def test_popen_makegateway_events(self, eventrecorder):
def test_popen_makegateway_events(self, _pytest):
rec = _pytest.getcallrecorder(py.execnet._API)
hm = GatewayManager(["popen"] * 2)
hm.makegateways()
event = eventrecorder.popevent("gwmanage_newgateway")
gw, platinfo = event.args[:2]
assert gw.id == "[1]"
platinfo.executable = gw._rinfo().executable
event = eventrecorder.popevent("gwmanage_newgateway")
gw, platinfo = event.args[:2]
assert gw.id == "[2]"
call = rec.popcall("pyexecnet_gwmanage_newgateway")
assert call.gateway.id == "[1]"
assert call.platinfo.executable == call.gateway._rinfo().executable
call = rec.popcall("pyexecnet_gwmanage_newgateway")
assert call.gateway.id == "[2]"
assert len(hm.gateways) == 2
hm.exit()
assert not len(hm.gateways)
@@ -60,18 +59,17 @@ class TestGatewayManagerPopen:
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
def test_hostmanage_rsync_same_popen_twice(self, source, dest, eventrecorder):
def test_hostmanage_rsync_same_popen_twice(self, source, dest, _pytest):
rec = _pytest.getcallrecorder(py.execnet._API)
hm = GatewayManager(["popen//chdir=%s" %dest] * 2)
hm.makegateways()
source.ensure("dir1", "dir2", "hello")
hm.rsync(source)
event = eventrecorder.popevent("gwmanage_rsyncstart")
source2 = event.kwargs['source']
gws = event.kwargs['gateways']
assert source2 == source
assert len(gws) == 1
assert hm.gateways[0] == gws[0]
event = eventrecorder.popevent("gwmanage_rsyncfinish")
call = rec.popcall("pyexecnet_gwmanage_rsyncstart")
assert call.source == source
assert len(call.gateways) == 1
assert hm.gateways[0] == call.gateways[0]
call = rec.popcall("pyexecnet_gwmanage_rsyncfinish")
def test_multi_chdir_popen_with_path(self, testdir):
import os