159 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			159 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
| import py
 | |
| try:
 | |
|     from py.magic import greenlet
 | |
| except (ImportError, RuntimeError), e:
 | |
|     py.test.skip(str(e))
 | |
| 
 | |
| import sys, gc
 | |
| from py.test import raises
 | |
| try:
 | |
|     import thread, threading
 | |
| except ImportError:
 | |
|     thread = None
 | |
| 
 | |
| def test_simple():
 | |
|     lst = []
 | |
|     def f():
 | |
|         lst.append(1)
 | |
|         greenlet.getcurrent().parent.switch()
 | |
|         lst.append(3)
 | |
|     g = greenlet(f)
 | |
|     lst.append(0)
 | |
|     g.switch()
 | |
|     lst.append(2)
 | |
|     g.switch()
 | |
|     lst.append(4)
 | |
|     assert lst == range(5)
 | |
| 
 | |
| def test_threads():
 | |
|     if not thread:
 | |
|         py.test.skip("this is a test about thread")
 | |
|     success = []
 | |
|     def f():
 | |
|         test_simple()
 | |
|         success.append(True)
 | |
|     ths = [threading.Thread(target=f) for i in range(10)]
 | |
|     for th in ths:
 | |
|         th.start()
 | |
|     for th in ths:
 | |
|         th.join()
 | |
|     assert len(success) == len(ths)
 | |
| 
 | |
| 
 | |
| class SomeError(Exception):
 | |
|     pass
 | |
| 
 | |
| def fmain(seen):
 | |
|     try:
 | |
|         greenlet.getcurrent().parent.switch()
 | |
|     except:
 | |
|         seen.append(sys.exc_info()[0])
 | |
|         raise
 | |
|     raise SomeError
 | |
| 
 | |
| def test_exception():
 | |
|     seen = []
 | |
|     g1 = greenlet(fmain)
 | |
|     g2 = greenlet(fmain)
 | |
|     g1.switch(seen)
 | |
|     g2.switch(seen)
 | |
|     g2.parent = g1
 | |
|     assert seen == []
 | |
|     raises(SomeError, g2.switch)
 | |
|     assert seen == [SomeError]
 | |
|     g2.switch()
 | |
|     assert seen == [SomeError]
 | |
| 
 | |
| def send_exception(g, exc):
 | |
|     # note: send_exception(g, exc)  can be now done with  g.throw(exc).
 | |
|     # the purpose of this test is to explicitely check the propagation rules.
 | |
|     def crasher(exc):
 | |
|         raise exc
 | |
|     g1 = greenlet(crasher, parent=g)
 | |
|     g1.switch(exc)
 | |
| 
 | |
| def test_send_exception():
 | |
|     seen = []
 | |
|     g1 = greenlet(fmain)
 | |
|     g1.switch(seen)
 | |
|     raises(KeyError, "send_exception(g1, KeyError)")
 | |
|     assert seen == [KeyError]
 | |
| 
 | |
| def test_dealloc():
 | |
|     seen = []
 | |
|     g1 = greenlet(fmain)
 | |
|     g2 = greenlet(fmain)
 | |
|     g1.switch(seen)
 | |
|     g2.switch(seen)
 | |
|     assert seen == []
 | |
|     del g1
 | |
|     gc.collect()
 | |
|     assert seen == [greenlet.GreenletExit]
 | |
|     del g2
 | |
|     gc.collect()
 | |
|     assert seen == [greenlet.GreenletExit, greenlet.GreenletExit]
 | |
| 
 | |
| def test_dealloc_other_thread():
 | |
|     if not thread:
 | |
|         py.test.skip("this is a test about thread")
 | |
|     seen = []
 | |
|     someref = []
 | |
|     lock = thread.allocate_lock()
 | |
|     lock.acquire()
 | |
|     lock2 = thread.allocate_lock()
 | |
|     lock2.acquire()
 | |
|     def f():
 | |
|         g1 = greenlet(fmain)
 | |
|         g1.switch(seen)
 | |
|         someref.append(g1)
 | |
|         del g1
 | |
|         gc.collect()
 | |
|         lock.release()
 | |
|         lock2.acquire()
 | |
|         greenlet()   # trigger release
 | |
|         lock.release()
 | |
|         lock2.acquire()
 | |
|     t = threading.Thread(target=f)
 | |
|     t.start()
 | |
|     lock.acquire()
 | |
|     assert seen == []
 | |
|     assert len(someref) == 1
 | |
|     del someref[:]
 | |
|     gc.collect()
 | |
|     # g1 is not released immediately because it's from another thread
 | |
|     assert seen == []
 | |
|     lock2.release()
 | |
|     lock.acquire()
 | |
|     assert seen == [greenlet.GreenletExit]
 | |
|     lock2.release()
 | |
|     t.join()
 | |
| 
 | |
| def test_frame():
 | |
|     def f1():
 | |
|         f = sys._getframe(0)
 | |
| 	assert f.f_back is None
 | |
| 	greenlet.getcurrent().parent.switch(f)
 | |
| 	return "meaning of life"
 | |
|     g = greenlet(f1)
 | |
|     frame = g.switch()
 | |
|     assert frame is g.gr_frame
 | |
|     assert g
 | |
|     next = g.switch()
 | |
|     assert not g
 | |
|     assert next == "meaning of life"
 | |
|     assert g.gr_frame is None
 | |
| 
 | |
| def test_thread_bug():
 | |
|     if not thread:
 | |
|         py.test.skip("this is a test about thread")
 | |
|     import time
 | |
|     def runner(x):
 | |
|         g = greenlet(lambda: time.sleep(x))
 | |
|         g.switch()
 | |
|     t1 = threading.Thread(target=runner, args=(0.2,))
 | |
|     t2 = threading.Thread(target=runner, args=(0.3,))
 | |
|     t1.start()
 | |
|     t2.start()
 | |
|     t1.join()
 | |
|     t2.join()
 |