[svn r63192] rename dsession to dist

--HG--
branch : trunk
This commit is contained in:
hpk
2009-03-21 20:31:09 +01:00
parent fc14b038af
commit 3902890e1b
15 changed files with 29 additions and 29 deletions

1
py/test/dist/__init__.py vendored Normal file
View File

@@ -0,0 +1 @@
#

238
py/test/dist/dsession.py vendored Normal file
View File

@@ -0,0 +1,238 @@
"""
EXPERIMENTAL dsession session (for dist/non-dist unification)
"""
import py
from py.__.test import event
from py.__.test.runner import basic_run_report, basic_collect_report
from py.__.test.session import Session
from py.__.test import outcome
from py.__.test.dist.nodemanage import NodeManager
import Queue
class LoopState(object):
def __init__(self, dsession, colitems):
self.dsession = dsession
self.colitems = colitems
self.exitstatus = None
# loopstate.dowork is False after reschedule events
# because otherwise we might very busily loop
# waiting for a host to become ready.
self.dowork = True
self.shuttingdown = False
self.testsfailed = False
def pyevent_itemtestreport(self, event):
if event.colitem in self.dsession.item2node:
self.dsession.removeitem(event.colitem)
if event.failed:
self.testsfailed = True
def pyevent_collectionreport(self, event):
if event.passed:
self.colitems.extend(event.result)
def pyevent_testnodeready(self, node):
self.dsession.addnode(node)
def pyevent_testnodedown(self, node, error=None):
pending = self.dsession.removenode(node)
if pending:
crashitem = pending[0]
self.dsession.handle_crashitem(crashitem, node)
self.colitems.extend(pending[1:])
def pyevent_rescheduleitems(self, event):
self.colitems.extend(event.items)
self.dowork = False # avoid busywait
class DSession(Session):
"""
Session drives the collection and running of tests
and generates test events for reporters.
"""
MAXITEMSPERHOST = 15
def __init__(self, config):
self.queue = Queue.Queue()
self.node2pending = {}
self.item2node = {}
super(DSession, self).__init__(config=config)
def pytest_configure(self, config):
if self.config.getvalue("usepdb"):
raise self.config.Error("--pdb does not work for distributed tests (yet).")
try:
self.config.getxspecs()
except self.config.Error:
print "Please specify test environments for distribution of tests:"
print "py.test --tx ssh=user@somehost --tx popen//python=python2.5"
print "conftest.py: pytest_option_tx=['ssh=user@somehost','popen']"
print "environment: PYTEST_OPTION_TX=ssh=@somehost,popen"
print
#print "see also: http://codespeak.net/py/current/doc/test.html#automated-distributed-testing"
raise SystemExit
def main(self, colitems=None):
colitems = self.getinitialitems(colitems)
self.sessionstarts()
self.setup()
exitstatus = self.loop(colitems)
self.teardown()
self.sessionfinishes()
return exitstatus
def loop_once(self, loopstate):
if loopstate.shuttingdown:
return self.loop_once_shutdown(loopstate)
colitems = loopstate.colitems
if loopstate.dowork and colitems:
self.triggertesting(loopstate.colitems)
colitems[:] = []
# we use a timeout here so that control-C gets through
while 1:
try:
eventcall = self.queue.get(timeout=2.0)
break
except Queue.Empty:
continue
loopstate.dowork = True
eventname, args, kwargs = eventcall
self.bus.notify(eventname, *args, **kwargs)
# termination conditions
if ((loopstate.testsfailed and self.config.option.exitfirst) or
(not self.item2node and not colitems and not self.queue.qsize())):
self.triggershutdown()
loopstate.shuttingdown = True
elif not self.node2pending:
loopstate.exitstatus = outcome.EXIT_NOHOSTS
def loop_once_shutdown(self, loopstate):
# once we are in shutdown mode we dont send
# events other than HostDown upstream
eventname, args, kwargs = self.queue.get()
if eventname == "testnodedown":
node, error = args[0], args[1]
self.bus.notify("testnodedown", node, error)
self.removenode(node)
if not self.node2pending:
# finished
if loopstate.testsfailed:
loopstate.exitstatus = outcome.EXIT_TESTSFAILED
else:
loopstate.exitstatus = outcome.EXIT_OK
def _initloopstate(self, colitems):
loopstate = LoopState(self, colitems)
self.config.bus.register(loopstate)
return loopstate
def loop(self, colitems):
try:
loopstate = self._initloopstate(colitems)
loopstate.dowork = False # first receive at least one HostUp events
while 1:
self.loop_once(loopstate)
if loopstate.exitstatus is not None:
exitstatus = loopstate.exitstatus
break
except KeyboardInterrupt:
exitstatus = outcome.EXIT_INTERRUPTED
except:
self.bus.notify("internalerror", event.InternalException())
exitstatus = outcome.EXIT_INTERNALERROR
self.config.bus.unregister(loopstate)
if exitstatus == 0 and self._testsfailed:
exitstatus = outcome.EXIT_TESTSFAILED
return exitstatus
def triggershutdown(self):
for node in self.node2pending:
node.shutdown()
def addnode(self, node):
assert node not in self.node2pending
self.node2pending[node] = []
def removenode(self, node):
try:
pending = self.node2pending.pop(node)
except KeyError:
# this happens if we didn't receive a testnodeready event yet
return []
for item in pending:
del self.item2node[item]
return pending
def triggertesting(self, colitems):
colitems = self.filteritems(colitems)
senditems = []
for next in colitems:
if isinstance(next, py.test.collect.Item):
senditems.append(next)
else:
self.bus.notify("collectionstart", event.CollectionStart(next))
self.queueevent("collectionreport", basic_collect_report(next))
self.senditems(senditems)
def queueevent(self, eventname, *args, **kwargs):
self.queue.put((eventname, args, kwargs))
def senditems(self, tosend):
if not tosend:
return
for node, pending in self.node2pending.items():
room = self.MAXITEMSPERHOST - len(pending)
if room > 0:
sending = tosend[:room]
node.sendlist(sending)
for item in sending:
#assert item not in self.item2node, (
# "sending same item %r to multiple "
# "not implemented" %(item,))
self.item2node[item] = node
self.bus.notify("itemstart", item, node)
pending.extend(sending)
tosend[:] = tosend[room:] # update inplace
if not tosend:
break
if tosend:
# we have some left, give it to the main loop
self.queueevent("rescheduleitems", event.RescheduleItems(tosend))
def removeitem(self, item):
if item not in self.item2node:
raise AssertionError(item, self.item2node)
node = self.item2node.pop(item)
self.node2pending[node].remove(item)
#self.config.bus.notify("removeitem", item, host.hostid)
def handle_crashitem(self, item, node):
longrepr = "!!! Node %r crashed during running of test %r" %(node, item)
rep = event.ItemTestReport(item, when="???", excinfo=longrepr)
self.bus.notify("itemtestreport", rep)
def setup(self):
""" setup any neccessary resources ahead of the test run. """
self.nodemanager = NodeManager(self.config)
self.nodemanager.setup_nodes(putevent=self.queue.put)
def teardown(self):
""" teardown any resources after a test run. """
self.nodemanager.teardown_nodes()
# debugging function
def dump_picklestate(item):
l = []
while 1:
state = item.__getstate__()
l.append(state)
item = state[-1]
if len(state) != 2:
break
return l

175
py/test/dist/mypickle.py vendored Normal file
View File

@@ -0,0 +1,175 @@
"""
Pickling support for two processes that want to exchange
*immutable* object instances. Immutable in the sense
that the receiving side of an object can modify its
copy but when it sends it back the original sending
side will continue to see its unmodified version
(and no actual state will go over the wire).
This module also implements an experimental
execnet pickling channel using this idea.
"""
from cStringIO import StringIO
from pickle import Pickler, Unpickler
import py
from py.__.execnet.channel import Channel
import os
#debug = open("log-mypickle-%d" % os.getpid(), 'w')
class MyPickler(Pickler):
""" Pickler with a custom memoize()
to take care of unique ID creation.
See the usage in ImmutablePickler
XXX we could probably extend Pickler
and Unpickler classes to directly
update the other'S memos.
"""
def __init__(self, file, protocol, uneven):
Pickler.__init__(self, file, protocol)
self.uneven = uneven
def memoize(self, obj):
if self.fast:
return
assert id(obj) not in self.memo
memo_len = len(self.memo)
key = memo_len * 2 + self.uneven
self.write(self.put(key))
self.memo[id(obj)] = key, obj
class ImmutablePickler:
def __init__(self, uneven, protocol=0):
""" ImmutablePicklers are instantiated in Pairs.
The two sides need to create unique IDs
while pickling their objects. This is
done by using either even or uneven
numbers, depending on the instantiation
parameter.
"""
self._picklememo = {}
self._unpicklememo = {}
self._protocol = protocol
self.uneven = uneven and 1 or 0
def selfmemoize(self, obj):
# this is for feeding objects to ourselfes
# which be the case e.g. if you want to pickle
# from a forked process back to the original
f = StringIO()
pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo
pickler.memoize(obj)
self._updateunpicklememo()
def dumps(self, obj):
f = StringIO()
pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo
pickler.dump(obj)
self._updateunpicklememo()
#print >>debug, "dumped", obj
#print >>debug, "picklememo", self._picklememo
return f.getvalue()
def loads(self, string):
f = StringIO(string)
unpickler = Unpickler(f)
unpickler.memo = self._unpicklememo
res = unpickler.load()
self._updatepicklememo()
#print >>debug, "loaded", res
#print >>debug, "unpicklememo", self._unpicklememo
return res
def _updatepicklememo(self):
for x, obj in self._unpicklememo.items():
self._picklememo[id(obj)] = (int(x), obj)
def _updateunpicklememo(self):
for key,obj in self._picklememo.values():
key = str(key)
if key in self._unpicklememo:
assert self._unpicklememo[key] is obj
self._unpicklememo[key] = obj
NO_ENDMARKER_WANTED = object()
class UnpickleError(Exception):
""" Problems while unpickling. """
def __init__(self, formatted):
self.formatted = formatted
Exception.__init__(self, formatted)
def __str__(self):
return self.formatted
class PickleChannel(object):
""" PickleChannels wrap execnet channels
and allow to send/receive by using
"immutable pickling".
"""
_unpicklingerror = None
def __init__(self, channel):
self._channel = channel
# we use the fact that each side of a
# gateway connection counts with uneven
# or even numbers depending on which
# side it is (for the purpose of creating
# unique ids - which is what we need it here for)
uneven = channel.gateway._channelfactory.count % 2
self._ipickle = ImmutablePickler(uneven=uneven)
self.RemoteError = channel.RemoteError
def send(self, obj):
if not isinstance(obj, Channel):
pickled_obj = self._ipickle.dumps(obj)
self._channel.send(pickled_obj)
else:
self._channel.send(obj)
def receive(self):
pickled_obj = self._channel.receive()
return self._unpickle(pickled_obj)
def _unpickle(self, pickled_obj):
if isinstance(pickled_obj, self._channel.__class__):
return pickled_obj
return self._ipickle.loads(pickled_obj)
def _getremoteerror(self):
return self._unpicklingerror or self._channel._getremoteerror()
def close(self):
return self._channel.close()
def isclosed(self):
return self._channel.isclosed()
def waitclose(self, timeout=None):
return self._channel.waitclose(timeout=timeout)
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
if endmarker is NO_ENDMARKER_WANTED:
def unpickle_callback(pickled_obj):
obj = self._unpickle(pickled_obj)
callback(obj)
self._channel.setcallback(unpickle_callback)
return
uniqueendmarker = object()
def unpickle_callback(pickled_obj):
if pickled_obj is uniqueendmarker:
return callback(endmarker)
try:
obj = self._unpickle(pickled_obj)
except KeyboardInterrupt:
raise
except:
excinfo = py.code.ExceptionInfo()
formatted = str(excinfo.getrepr(showlocals=True,funcargs=True))
self._unpicklingerror = UnpickleError(formatted)
callback(endmarker)
else:
callback(obj)
self._channel.setcallback(unpickle_callback, uniqueendmarker)

90
py/test/dist/nodemanage.py vendored Normal file
View File

@@ -0,0 +1,90 @@
import py
import sys, os
from py.__.test.dist.txnode import MasterNode
from py.__.execnet.gwmanage import GatewayManager
class NodeManager(object):
def __init__(self, config, specs=None):
self.config = config
if specs is None:
specs = self.config.getxspecs()
self.roots = self.config.getrsyncdirs()
self.gwmanager = GatewayManager(specs)
self.nodes = []
def trace(self, msg):
self.config.bus.notify("trace", "nodemanage", msg)
def trace_nodestatus(self):
if self.config.option.debug:
for ch, result in self.gwmanager.multi_exec("""
import sys, os
channel.send((sys.executable, os.getcwd(), sys.path))
""").receive_each(withchannel=True):
self.trace("spec %r, execuable %r, cwd %r, syspath %r" %(
ch.gateway.spec, result[0], result[1], result[2]))
def config_getignores(self):
return self.config.getconftest_pathlist("rsyncignore")
def rsync_roots(self):
""" make sure that all remote gateways
have the same set of roots in their
current directory.
"""
self.makegateways()
options = {
'ignores': self.config_getignores(),
'verbose': self.config.option.verbose,
}
if self.roots:
# send each rsync root
for root in self.roots:
self.gwmanager.rsync(root, **options)
else:
XXX # do we want to care for situations without explicit rsyncdirs?
# we transfer our topdir as the root
self.gwmanager.rsync(self.config.topdir, **options)
# and cd into it
self.gwmanager.multi_chdir(self.config.topdir.basename, inplacelocal=False)
def makegateways(self):
# we change to the topdir sot that
# PopenGateways will have their cwd
# such that unpickling configs will
# pick it up as the right topdir
# (for other gateways this chdir is irrelevant)
old = self.config.topdir.chdir()
try:
self.gwmanager.makegateways()
finally:
old.chdir()
self.trace_nodestatus()
def setup_nodes(self, putevent):
self.rsync_roots()
nice = self.config.getvalue("dist_nicelevel")
if nice != 0:
self.gwmanager.multi_exec("""
import os
if hasattr(os, 'nice'):
os.nice(%r)
""" % nice).waitclose()
self.trace_nodestatus()
multigw = self.gwmanager.getgateways(inplacelocal=False, remote=True)
multigw.remote_exec("""
import os, sys
sys.path.insert(0, os.getcwd())
""").waitclose()
for gateway in self.gwmanager.gateways:
node = MasterNode(gateway, self.config, putevent)
self.nodes.append(node)
def teardown_nodes(self):
# XXX teardown nodes?
self.gwmanager.exit()

1
py/test/dist/testing/__init__.py vendored Normal file
View File

@@ -0,0 +1 @@
#

336
py/test/dist/testing/test_dsession.py vendored Normal file
View File

@@ -0,0 +1,336 @@
from py.__.test.dist.dsession import DSession
from py.__.test.runner import basic_collect_report
from py.__.test import event
from py.__.test import outcome
import py
XSpec = py.execnet.XSpec
def run(item):
runner = item._getrunner()
return runner(item)
class MockNode:
def __init__(self):
self.sent = []
def sendlist(self, items):
self.sent.append(items)
def shutdown(self):
self._shutdown=True
def dumpqueue(queue):
while queue.qsize():
print queue.get()
class TestDSession:
def test_add_remove_node(self, testdir):
item = testdir.getitem("def test_func(): pass")
rep = run(item)
session = DSession(item.config)
node = MockNode()
assert not session.node2pending
session.addnode(node)
assert len(session.node2pending) == 1
session.senditems([item])
pending = session.removenode(node)
assert pending == [item]
assert item not in session.item2node
l = session.removenode(node)
assert not l
def test_senditems_removeitems(self, testdir):
item = testdir.getitem("def test_func(): pass")
rep = run(item)
session = DSession(item.config)
node = MockNode()
session.addnode(node)
session.senditems([item])
assert session.node2pending[node] == [item]
assert session.item2node[item] == node
session.removeitem(item)
assert not session.node2pending[node]
assert not session.item2node
def test_triggertesting_collect(self, testdir):
modcol = testdir.getmodulecol("""
def test_func():
pass
""")
session = DSession(modcol.config)
session.triggertesting([modcol])
name, args, kwargs = session.queue.get(block=False)
assert name == 'collectionreport'
rep, = args
assert len(rep.result) == 1
def test_triggertesting_item(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node1 = MockNode()
node2 = MockNode()
session.addnode(node1)
session.addnode(node2)
session.triggertesting([item] * (session.MAXITEMSPERHOST*2 + 1))
sent1 = node1.sent[0]
sent2 = node2.sent[0]
assert sent1 == [item] * session.MAXITEMSPERHOST
assert sent2 == [item] * session.MAXITEMSPERHOST
assert session.node2pending[node1] == sent1
assert session.node2pending[node2] == sent2
name, args, kwargs = session.queue.get(block=False)
assert name == "rescheduleitems"
ev, = args
assert ev.items == [item]
def test_keyboardinterrupt(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
def raise_(timeout=None): raise KeyboardInterrupt()
session.queue.get = raise_
exitstatus = session.loop([])
assert exitstatus == outcome.EXIT_INTERRUPTED
def test_internalerror(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
def raise_(): raise ValueError()
session.queue.get = raise_
exitstatus = session.loop([])
assert exitstatus == outcome.EXIT_INTERNALERROR
def test_rescheduleevent(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node = MockNode()
session.addnode(node)
ev = event.RescheduleItems([item])
loopstate = session._initloopstate([])
session.queueevent("rescheduleitems", ev)
session.loop_once(loopstate)
# check that RescheduleEvents are not immediately
# rescheduled if there are no nodes
assert loopstate.dowork == False
session.queueevent("anonymous", event.NOP())
session.loop_once(loopstate)
session.queueevent("anonymous", event.NOP())
session.loop_once(loopstate)
assert node.sent == [[item]]
session.queueevent("itemtestreport", run(item))
session.loop_once(loopstate)
assert loopstate.shuttingdown
assert not loopstate.testsfailed
def test_no_node_remaining_for_tests(self, testdir):
item = testdir.getitem("def test_func(): pass")
# setup a session with one node
session = DSession(item.config)
node = MockNode()
session.addnode(node)
# setup a HostDown event
session.queueevent("testnodedown", node, None)
loopstate = session._initloopstate([item])
loopstate.dowork = False
session.loop_once(loopstate)
dumpqueue(session.queue)
assert loopstate.exitstatus == outcome.EXIT_NOHOSTS
def test_testnodedown_causes_reschedule_pending(self, testdir, EventRecorder):
modcol = testdir.getmodulecol("""
def test_crash():
assert 0
def test_fail():
x
""")
item1, item2 = modcol.collect()
# setup a session with two nodes
session = DSession(item1.config)
node1, node2 = MockNode(), MockNode()
session.addnode(node1)
session.addnode(node2)
# have one test pending for a node that goes down
session.senditems([item1, item2])
node = session.item2node[item1]
session.queueevent("testnodedown", node, None)
evrec = EventRecorder(session.bus)
print session.item2node
loopstate = session._initloopstate([])
session.loop_once(loopstate)
assert loopstate.colitems == [item2] # do not reschedule crash item
testrep = evrec.getfirstnamed("itemtestreport")
assert testrep.failed
assert testrep.colitem == item1
assert str(testrep.longrepr).find("crashed") != -1
#assert str(testrep.longrepr).find(node.gateway.spec) != -1
def test_testnodeready_adds_to_available(self, testdir):
item = testdir.getitem("def test_func(): pass")
# setup a session with two nodes
session = DSession(item.config)
node1 = MockNode()
session.queueevent("testnodeready", node1)
loopstate = session._initloopstate([item])
loopstate.dowork = False
assert len(session.node2pending) == 0
session.loop_once(loopstate)
assert len(session.node2pending) == 1
def test_event_propagation(self, testdir, EventRecorder):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
evrec = EventRecorder(session.bus)
session.queueevent("NOPevent", 42)
session.loop_once(session._initloopstate([]))
assert evrec.getfirstnamed('NOPevent')
def runthrough(self, item):
session = DSession(item.config)
node = MockNode()
session.addnode(node)
loopstate = session._initloopstate([item])
session.queueevent("NOP")
session.loop_once(loopstate)
assert node.sent == [[item]]
ev = run(item)
session.queueevent("itemtestreport", ev)
session.loop_once(loopstate)
assert loopstate.shuttingdown
session.queueevent("testnodedown", node, None)
session.loop_once(loopstate)
dumpqueue(session.queue)
return session, loopstate.exitstatus
def test_exit_completed_tests_ok(self, testdir):
item = testdir.getitem("def test_func(): pass")
session, exitstatus = self.runthrough(item)
assert exitstatus == outcome.EXIT_OK
def test_exit_completed_tests_fail(self, testdir):
item = testdir.getitem("def test_func(): 0/0")
session, exitstatus = self.runthrough(item)
assert exitstatus == outcome.EXIT_TESTSFAILED
def test_exit_on_first_failing(self, testdir):
modcol = testdir.getmodulecol("""
def test_fail():
assert 0
def test_pass():
pass
""")
modcol.config.option.exitfirst = True
session = DSession(modcol.config)
node = MockNode()
session.addnode(node)
items = basic_collect_report(modcol).result
# trigger testing - this sends tests to the node
session.triggertesting(items)
# run tests ourselves and produce reports
ev1 = run(items[0])
ev2 = run(items[1])
session.queueevent("itemtestreport", ev1) # a failing one
session.queueevent("itemtestreport", ev2)
# now call the loop
loopstate = session._initloopstate(items)
session.loop_once(loopstate)
assert loopstate.testsfailed
assert loopstate.shuttingdown
def test_shuttingdown_filters_events(self, testdir, EventRecorder):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node = MockNode()
session.addnode(node)
loopstate = session._initloopstate([])
loopstate.shuttingdown = True
evrec = EventRecorder(session.bus)
session.queueevent("itemtestreport", run(item))
session.loop_once(loopstate)
assert not evrec.getfirstnamed("testnodedown")
session.queueevent("testnodedown", node, None)
session.loop_once(loopstate)
assert evrec.getfirstnamed('testnodedown') == node
def test_filteritems(self, testdir, EventRecorder):
modcol = testdir.getmodulecol("""
def test_fail():
assert 0
def test_pass():
pass
""")
session = DSession(modcol.config)
modcol.config.option.keyword = "nothing"
dsel = session.filteritems([modcol])
assert dsel == [modcol]
items = modcol.collect()
evrec = EventRecorder(session.bus)
remaining = session.filteritems(items)
assert remaining == []
event = evrec.events[-1]
assert event.name == "deselected"
assert event.args[0].items == items
modcol.config.option.keyword = "test_fail"
remaining = session.filteritems(items)
assert remaining == [items[0]]
event = evrec.events[-1]
assert event.name == "deselected"
assert event.args[0].items == [items[1]]
def test_testnodedown_shutdown_after_completion(self, testdir):
item = testdir.getitem("def test_func(): pass")
session = DSession(item.config)
node = MockNode()
session.addnode(node)
session.senditems([item])
session.queueevent("itemtestreport", run(item))
loopstate = session._initloopstate([])
session.loop_once(loopstate)
assert node._shutdown is True
assert loopstate.exitstatus is None, "loop did not wait for testnodedown"
assert loopstate.shuttingdown
session.queueevent("testnodedown", node, None)
session.loop_once(loopstate)
assert loopstate.exitstatus == 0
def test_nopending_but_collection_remains(self, testdir):
modcol = testdir.getmodulecol("""
def test_fail():
assert 0
def test_pass():
pass
""")
session = DSession(modcol.config)
node = MockNode()
session.addnode(node)
colreport = basic_collect_report(modcol)
item1, item2 = colreport.result
session.senditems([item1])
# node2pending will become empty when the loop sees the report
session.queueevent("itemtestreport", run(item1))
# but we have a collection pending
session.queueevent("collectionreport", colreport)
loopstate = session._initloopstate([])
session.loop_once(loopstate)
assert loopstate.exitstatus is None, "loop did not care for collection report"
assert not loopstate.colitems
session.loop_once(loopstate)
assert loopstate.colitems == colreport.result
assert loopstate.exitstatus is None, "loop did not care for colitems"

View File

@@ -0,0 +1,94 @@
import py
from py.__.test.dist.dsession import DSession
from test_txnode import EventQueue
class TestAsyncFunctional:
def test_conftest_options(self, testdir):
p1 = testdir.tmpdir.ensure("dir", 'p1.py')
p1.dirpath("__init__.py").write("")
p1.dirpath("conftest.py").write(py.code.Source("""
print "importing conftest", __file__
import py
Option = py.test.config.Option
option = py.test.config.addoptions("someopt",
Option('--someopt', action="store_true", dest="someopt", default=False))
dist_rsync_roots = ['../dir']
print "added options", option
print "config file seen from conftest", py.test.config
"""))
p1.write(py.code.Source("""
import py, conftest
def test_1():
print "config from test_1", py.test.config
print "conftest from test_1", conftest.__file__
print "test_1: py.test.config.option.someopt", py.test.config.option.someopt
print "test_1: conftest", conftest
print "test_1: conftest.option.someopt", conftest.option.someopt
assert conftest.option.someopt
"""))
result = testdir.runpytest('-n1', p1, '--someopt')
assert result.ret == 0
extra = result.stdout.fnmatch_lines([
"*1 passed*",
])
def test_dist_some_tests(self, testdir):
p1 = testdir.makepyfile(test_one="""
def test_1():
pass
def test_x():
import py
py.test.skip("aaa")
def test_fail():
assert 0
""")
config = testdir.parseconfig('-d', p1, '--tx=popen')
dsession = DSession(config)
eq = EventQueue(config.bus)
dsession.main([config.getfsnode(p1)])
ev, = eq.geteventargs("itemtestreport")
assert ev.passed
ev, = eq.geteventargs("itemtestreport")
assert ev.skipped
ev, = eq.geteventargs("itemtestreport")
assert ev.failed
# see that the node is really down
node, error = eq.geteventargs("testnodedown")
assert node.gateway.spec.popen
eq.geteventargs("testrunfinish")
def test_distribution_rsyncdirs_example(self, testdir):
source = testdir.mkdir("source")
dest = testdir.mkdir("dest")
subdir = source.mkdir("example_pkg")
subdir.ensure("__init__.py")
p = subdir.join("test_one.py")
p.write("def test_5(): assert not __file__.startswith(%r)" % str(p))
result = testdir.runpytest("-d", "--rsyncdirs=%(subdir)s" % locals(),
"--tx=popen//chdir=%(dest)s" % locals(), p)
assert result.ret == 0
result.stdout.fnmatch_lines([
"*1* new *popen*platform*",
#"RSyncStart: [G1]",
#"RSyncFinished: [G1]",
"*1 passed*"
])
assert dest.join(subdir.basename).check(dir=1)
def test_nice_level(self, testdir):
""" Tests if nice level behaviour is ok """
import os
if not hasattr(os, 'nice'):
py.test.skip("no os.nice() available")
testdir.makepyfile(conftest="""
dist_nicelevel = 10
""")
p1 = testdir.makepyfile("""
def test_nice():
import os
assert os.nice(0) == 10
""")
evrec = testdir.inline_run('-d', p1, '--tx=popen')
ev = evrec.getfirstnamed('itemtestreport')
assert ev.passed

214
py/test/dist/testing/test_mypickle.py vendored Normal file
View File

@@ -0,0 +1,214 @@
import py
from py.__.test.dist.mypickle import ImmutablePickler, PickleChannel, UnpickleError
class A:
pass
def test_pickle_and_back_IS_same():
def pickle_band_back_IS_same(obj, proto):
p1 = ImmutablePickler(uneven=False, protocol=proto)
p2 = ImmutablePickler(uneven=True, protocol=proto)
s1 = p1.dumps(obj)
d2 = p2.loads(s1)
s2 = p2.dumps(d2)
obj_back = p1.loads(s2)
assert obj is obj_back
a1 = A()
a2 = A()
a2.a1 = a1
for proto in (0,1,2, -1):
for obj in {1:2}, [1,2,3], a1, a2:
yield pickle_band_back_IS_same, obj, proto
def test_pickling_twice_before_unpickling():
p1 = ImmutablePickler(uneven=False)
p2 = ImmutablePickler(uneven=True)
a1 = A()
a2 = A()
a3 = A()
a3.a1 = a1
a2.a1 = a1
s1 = p1.dumps(a1)
a1.a3 = a3
s2 = p1.dumps(a2)
other_a1 = p2.loads(s1)
other_a2 = p2.loads(s2)
back_a1 = p1.loads(p2.dumps(other_a1))
other_a3 = p2.loads(p1.dumps(a3))
back_a3 = p1.loads(p2.dumps(other_a3))
back_a2 = p1.loads(p2.dumps(other_a2))
back_a1 = p1.loads(p2.dumps(other_a1))
assert back_a1 is a1
assert back_a2 is a2
def test_pickling_concurrently():
p1 = ImmutablePickler(uneven=False)
p2 = ImmutablePickler(uneven=True)
a1 = A()
a1.hasattr = 42
a2 = A()
s1 = p1.dumps(a1)
s2 = p2.dumps(a2)
other_a1 = p2.loads(s1)
other_a2 = p1.loads(s2)
a1_back = p1.loads(p2.dumps(other_a1))
def test_self_memoize():
p1 = ImmutablePickler(uneven=False)
a1 = A()
p1.selfmemoize(a1)
x = p1.loads(p1.dumps(a1))
assert x is a1
TESTTIMEOUT = 2.0
class TestPickleChannelFunctional:
def setup_class(cls):
cls.gw = py.execnet.PopenGateway()
cls.gw.remote_init_threads(5)
def teardown_class(cls):
cls.gw.exit()
def test_popen_send_instance(self):
channel = self.gw.remote_exec("""
from py.__.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from py.__.test.dist.testing.test_mypickle import A
a1 = A()
a1.hello = 10
channel.send(a1)
a2 = channel.receive()
channel.send(a2 is a1)
""")
channel = PickleChannel(channel)
a_received = channel.receive()
assert isinstance(a_received, A)
assert a_received.hello == 10
channel.send(a_received)
remote_a2_is_a1 = channel.receive()
assert remote_a2_is_a1
def test_send_concurrent(self):
channel = self.gw.remote_exec("""
from py.__.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from py.__.test.dist.testing.test_mypickle import A
l = [A() for i in range(10)]
channel.send(l)
other_l = channel.receive()
channel.send((l, other_l))
channel.send(channel.receive())
channel.receive()
""")
channel = PickleChannel(channel)
l = [A() for i in range(10)]
channel.send(l)
other_l = channel.receive()
channel.send(other_l)
ret = channel.receive()
assert ret[0] is other_l
assert ret[1] is l
back = channel.receive()
assert other_l is other_l
channel.send(None)
#s1 = p1.dumps(a1)
#s2 = p2.dumps(a2)
#other_a1 = p2.loads(s1)
#other_a2 = p1.loads(s2)
#a1_back = p1.loads(p2.dumps(other_a1))
def test_popen_with_callback(self):
channel = self.gw.remote_exec("""
from py.__.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from py.__.test.dist.testing.test_mypickle import A
a1 = A()
a1.hello = 10
channel.send(a1)
a2 = channel.receive()
channel.send(a2 is a1)
""")
channel = PickleChannel(channel)
queue = py.std.Queue.Queue()
channel.setcallback(queue.put)
a_received = queue.get(timeout=TESTTIMEOUT)
assert isinstance(a_received, A)
assert a_received.hello == 10
channel.send(a_received)
#remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
#assert remote_a2_is_a1
def test_popen_with_callback_with_endmarker(self):
channel = self.gw.remote_exec("""
from py.__.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from py.__.test.dist.testing.test_mypickle import A
a1 = A()
a1.hello = 10
channel.send(a1)
a2 = channel.receive()
channel.send(a2 is a1)
""")
channel = PickleChannel(channel)
queue = py.std.Queue.Queue()
channel.setcallback(queue.put, endmarker=-1)
a_received = queue.get(timeout=TESTTIMEOUT)
assert isinstance(a_received, A)
assert a_received.hello == 10
channel.send(a_received)
remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
assert remote_a2_is_a1
endmarker = queue.get(timeout=TESTTIMEOUT)
assert endmarker == -1
def test_popen_with_callback_with_endmarker_and_unpickling_error(self):
channel = self.gw.remote_exec("""
from py.__.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
from py.__.test.dist.testing.test_mypickle import A
a1 = A()
channel.send(a1)
channel.send(a1)
""")
channel = PickleChannel(channel)
queue = py.std.Queue.Queue()
a = channel.receive()
channel._ipickle._unpicklememo.clear()
channel.setcallback(queue.put, endmarker=-1)
next = queue.get(timeout=TESTTIMEOUT)
assert next == -1
error = channel._getremoteerror()
assert isinstance(error, UnpickleError)
def test_popen_with_newchannel(self):
channel = self.gw.remote_exec("""
from py.__.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
newchannel = channel.receive()
newchannel.send(42)
""")
channel = PickleChannel(channel)
newchannel = self.gw.newchannel()
channel.send(newchannel)
channel.waitclose()
res = newchannel.receive()
assert res == 42
def test_popen_with_various_methods(self):
channel = self.gw.remote_exec("""
from py.__.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
channel.receive()
""")
channel = PickleChannel(channel)
assert not channel.isclosed()
assert not channel._getremoteerror()
channel.send(2)
channel.waitclose(timeout=2)

155
py/test/dist/testing/test_nodemanage.py vendored Normal file
View File

@@ -0,0 +1,155 @@
""" RSync filter test
"""
import py
from py.__.test.dist.nodemanage import NodeManager
from py.__.test import event
def pytest_pyfuncarg_source(pyfuncitem):
return py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("source")
def pytest_pyfuncarg_dest(pyfuncitem):
dest = py.test.ensuretemp(pyfuncitem.getmodpath()).mkdir("dest")
return dest
class TestNodeManager:
@py.test.mark.xfail("consider / forbid implicit rsyncdirs?")
def test_rsync_roots_no_roots(self, source, dest):
source.ensure("dir1", "file1").write("hello")
config = py.test.config._reparse([source])
nodemanager = NodeManager(config, ["popen//chdir=%s" % dest])
assert nodemanager.config.topdir == source == config.topdir
nodemanager.rsync_roots()
p, = nodemanager.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
p = py.path.local(p)
print "remote curdir", p
assert p == dest.join(config.topdir.basename)
assert p.join("dir1").check()
assert p.join("dir1", "file1").check()
def test_popen_rsync_subdir(self, testdir, source, dest):
dir1 = source.mkdir("dir1")
dir2 = dir1.mkdir("dir2")
dir2.ensure("hello")
for rsyncroot in (dir1, source):
dest.remove()
nodemanager = NodeManager(testdir.parseconfig(
"--tx", "popen//chdir=%s" % dest,
"--rsyncdirs", rsyncroot,
source,
))
assert nodemanager.config.topdir == source
nodemanager.rsync_roots()
if rsyncroot == source:
dest = dest.join("source")
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
nodemanager.gwmanager.exit()
def test_init_rsync_roots(self, source, dest):
dir2 = source.ensure("dir1", "dir2", dir=1)
source.ensure("dir1", "somefile", dir=1)
dir2.ensure("hello")
source.ensure("bogusdir", "file")
source.join("conftest.py").write(py.code.Source("""
rsyncdirs = ['dir1/dir2']
"""))
session = py.test.config._reparse([source]).initsession()
nodemanager = NodeManager(session.config, ["popen//chdir=%s" % dest])
nodemanager.rsync_roots()
assert dest.join("dir2").check()
assert not dest.join("dir1").check()
assert not dest.join("bogus").check()
def test_rsyncignore(self, source, dest):
dir2 = source.ensure("dir1", "dir2", dir=1)
dir5 = source.ensure("dir5", "dir6", "bogus")
dirf = source.ensure("dir5", "file")
dir2.ensure("hello")
source.join("conftest.py").write(py.code.Source("""
rsyncdirs = ['dir1', 'dir5']
rsyncignore = ['dir1/dir2', 'dir5/dir6']
"""))
session = py.test.config._reparse([source]).initsession()
nodemanager = NodeManager(session.config,
["popen//chdir=%s" % dest])
nodemanager.rsync_roots()
assert dest.join("dir1").check()
assert not dest.join("dir1", "dir2").check()
assert dest.join("dir5","file").check()
assert not dest.join("dir6").check()
def test_optimise_popen(self, source, dest):
specs = ["popen"] * 3
source.join("conftest.py").write("rsyncdirs = ['a']")
source.ensure('a', dir=1)
config = py.test.config._reparse([source])
nodemanager = NodeManager(config, specs)
nodemanager.rsync_roots()
for gwspec in nodemanager.gwmanager.specs:
assert gwspec._samefilesystem()
assert not gwspec.chdir
def test_setup_DEBUG(self, source, EventRecorder):
specs = ["popen"] * 2
source.join("conftest.py").write("rsyncdirs = ['a']")
source.ensure('a', dir=1)
config = py.test.config._reparse([source, '--debug'])
assert config.option.debug
nodemanager = NodeManager(config, specs)
evrec = EventRecorder(config.bus, debug=True)
nodemanager.setup_nodes(putevent=[].append)
for spec in nodemanager.gwmanager.specs:
l = evrec.getnamed("trace")
print evrec.events
assert l
nodemanager.teardown_nodes()
def test_ssh_setup_nodes(self, specssh, testdir):
testdir.makepyfile(__init__="", test_x="""
def test_one():
pass
""")
sorter = testdir.inline_run("-d", "--rsyncdirs=%s" % testdir.tmpdir,
"--tx=%s" % specssh, testdir.tmpdir)
ev = sorter.getfirstnamed("itemtestreport")
assert ev.passed
class TestOptionsAndConfiguration:
def test_getxspecs_numprocesses(self, testdir):
config = testdir.parseconfig("-n3")
xspecs = config.getxspecs()
assert len(xspecs) == 3
def test_getxspecs(self, testdir):
testdir.chdir()
config = testdir.parseconfig("--tx=popen", "--tx", "ssh=xyz")
xspecs = config.getxspecs()
assert len(xspecs) == 2
print xspecs
assert xspecs[0].popen
assert xspecs[1].ssh == "xyz"
def test_getconfigroots(self, testdir):
config = testdir.parseconfig('--rsyncdirs=' + str(testdir.tmpdir))
roots = config.getrsyncdirs()
assert len(roots) == 1 + 1
assert testdir.tmpdir in roots
def test_getconfigroots_with_conftest(self, testdir):
testdir.chdir()
p = py.path.local()
for bn in 'x y z'.split():
p.mkdir(bn)
testdir.makeconftest("""
rsyncdirs= 'x',
""")
config = testdir.parseconfig(testdir.tmpdir, '--rsyncdirs=y,z')
roots = config.getrsyncdirs()
assert len(roots) == 3 + 1
assert py.path.local('y') in roots
assert py.path.local('z') in roots
assert testdir.tmpdir.join('x') in roots

145
py/test/dist/testing/test_txnode.py vendored Normal file
View File

@@ -0,0 +1,145 @@
import py
from py.__.test.dist.txnode import MasterNode
class EventQueue:
def __init__(self, bus, queue=None):
if queue is None:
queue = py.std.Queue.Queue()
self.queue = queue
bus.register(self)
def pyevent(self, eventname, *args, **kwargs):
self.queue.put((eventname, args, kwargs))
def geteventargs(self, eventname, timeout=2.0):
events = []
while 1:
try:
eventcall = self.queue.get(timeout=timeout)
except py.std.Queue.Empty:
#print "node channel", self.node.channel
#print "remoteerror", self.node.channel._getremoteerror()
print "seen events", events
raise IOError("did not see %r events" % (eventname))
else:
name, args, kwargs = eventcall
assert isinstance(name, str)
if name == eventname:
return args
events.append(name)
class MySetup:
def __init__(self, pyfuncitem):
self.pyfuncitem = pyfuncitem
def geteventargs(self, eventname, timeout=2.0):
eq = EventQueue(self.config.bus, self.queue)
return eq.geteventargs(eventname, timeout=timeout)
def makenode(self, config=None):
if config is None:
config = py.test.config._reparse([])
self.config = config
self.queue = py.std.Queue.Queue()
self.xspec = py.execnet.XSpec("popen")
self.gateway = py.execnet.makegateway(self.xspec)
self.node = MasterNode(self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed()
return self.node
def finalize(self):
if hasattr(self, 'node'):
gw = self.node.gateway
print "exiting:", gw
gw.exit()
def pytest_pyfuncarg_mysetup(pyfuncitem):
mysetup = MySetup(pyfuncitem)
pyfuncitem.addfinalizer(mysetup.finalize)
return mysetup
def pytest_pyfuncarg_testdir(__call__, pyfuncitem):
# decorate to make us always change to testdir
testdir = __call__.execute(firstresult=True)
testdir.chdir()
return testdir
def test_node_hash_equality(mysetup):
node = mysetup.makenode()
node2 = mysetup.makenode()
assert node != node2
assert node == node
assert not (node != node)
class TestMasterSlaveConnection:
def test_crash_invalid_item(self, mysetup):
node = mysetup.makenode()
node.send(123) # invalid item
n, error = mysetup.geteventargs("testnodedown")
assert n is node
assert str(error).find("AttributeError") != -1
def test_crash_killed(self, testdir, mysetup):
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill")
item = testdir.getitem("""
def test_func():
import os
os.kill(os.getpid(), 15)
""")
node = mysetup.makenode(item.config)
node.send(item)
n, error = mysetup.geteventargs("testnodedown")
assert n is node
assert str(error).find("Not properly terminated") != -1
def test_node_down(self, mysetup):
node = mysetup.makenode()
node.shutdown()
n, error = mysetup.geteventargs("testnodedown")
assert n is node
assert not error
node.callback(node.ENDMARK)
excinfo = py.test.raises(IOError,
"mysetup.geteventargs('testnodedown', timeout=0.01)")
def test_send_on_closed_channel(self, testdir, mysetup):
item = testdir.getitem("def test_func(): pass")
node = mysetup.makenode(item.config)
node.channel.close()
py.test.raises(IOError, "node.send(item)")
#ev = self.geteventargs(event.InternalException)
#assert ev.excinfo.errisinstance(IOError)
def test_send_one(self, testdir, mysetup):
item = testdir.getitem("def test_func(): pass")
node = mysetup.makenode(item.config)
node.send(item)
ev, = mysetup.geteventargs("itemtestreport")
assert ev.passed
assert ev.colitem == item
#assert event.item == item
#assert event.item is not item
def test_send_some(self, testdir, mysetup):
items = testdir.getitems("""
def test_pass():
pass
def test_fail():
assert 0
def test_skip():
import py
py.test.skip("x")
""")
node = mysetup.makenode(items[0].config)
for item in items:
node.send(item)
for outcome in "passed failed skipped".split():
ev, = mysetup.geteventargs("itemtestreport")
assert getattr(ev, outcome)
node.sendlist(items)
for outcome in "passed failed skipped".split():
ev, = mysetup.geteventargs("itemtestreport")
assert getattr(ev, outcome)

127
py/test/dist/txnode.py vendored Normal file
View File

@@ -0,0 +1,127 @@
"""
Manage setup, running and local representation of remote nodes/processes.
"""
import py
from py.__.test import event
from py.__.test.dist.mypickle import PickleChannel
class MasterNode(object):
""" Install slave code, manage sending test tasks & receiving results """
ENDMARK = -1
def __init__(self, gateway, config, putevent):
self.config = config
self.putevent = putevent
self.gateway = gateway
self.channel = install_slave(gateway, config)
self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False
def notify(self, eventname, *args, **kwargs):
self.putevent((eventname, args, kwargs))
def callback(self, eventcall):
""" this gets called for each object we receive from
the other side and if the channel closes.
Note that channel callbacks run in the receiver
thread of execnet gateways - we need to
avoid raising exceptions or doing heavy work.
"""
try:
if eventcall == self.ENDMARK:
err = self.channel._getremoteerror()
if not self._down:
if not err:
err = "Not properly terminated"
self.notify("testnodedown", self, err)
self._down = True
return
eventname, args, kwargs = eventcall
if eventname == "slaveready":
self.notify("testnodeready", self)
elif eventname == "slavefinished":
self._down = True
self.notify("testnodedown", self, None)
elif eventname == "itemtestreport":
rep = args[0]
rep.node = self
self.notify("itemtestreport", rep)
else:
self.notify(eventname, *args, **kwargs)
except KeyboardInterrupt:
# should not land in receiver-thread
raise
except:
excinfo = py.code.ExceptionInfo()
print "!" * 20, excinfo
self.notify("internalerror", event.InternalException(excinfo))
def send(self, item):
assert item is not None
self.channel.send(item)
def sendlist(self, itemlist):
self.channel.send(itemlist)
def shutdown(self):
self.channel.send(None)
# setting up slave code
def install_slave(gateway, config):
channel = gateway.remote_exec(source="""
from py.__.test.dist.mypickle import PickleChannel
from py.__.test.dist.txnode import SlaveNode
channel = PickleChannel(channel)
slavenode = SlaveNode(channel)
slavenode.run()
""")
channel = PickleChannel(channel)
basetemp = None
if gateway.spec.popen:
popenbase = config.ensuretemp("popen")
basetemp = py.path.local.make_numbered_dir(prefix="slave-",
keep=0, rootdir=popenbase)
basetemp = str(basetemp)
channel.send((config, basetemp))
return channel
class SlaveNode(object):
def __init__(self, channel):
self.channel = channel
def __repr__(self):
return "<%s channel=%s>" %(self.__class__.__name__, self.channel)
def sendevent(self, eventname, *args, **kwargs):
self.channel.send((eventname, args, kwargs))
def run(self):
channel = self.channel
self.config, basetemp = channel.receive()
if basetemp:
self.config.basetemp = py.path.local(basetemp)
self.config.pytestplugins.do_configure(self.config)
self.sendevent("slaveready")
try:
while 1:
task = channel.receive()
if task is None:
self.sendevent("slavefinished")
break
if isinstance(task, list):
for item in task:
self.runtest(item)
else:
self.runtest(task)
except KeyboardInterrupt:
raise
except:
self.sendevent("internalerror", event.InternalException())
raise
def runtest(self, item):
runner = item._getrunner()
testrep = runner(item)
self.sendevent("itemtestreport", testrep)