159 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			159 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
import py
 | 
						|
try:
 | 
						|
    from py.magic import greenlet
 | 
						|
except (ImportError, RuntimeError), e:
 | 
						|
    py.test.skip(str(e))
 | 
						|
 | 
						|
import sys, gc
 | 
						|
from py.test import raises
 | 
						|
try:
 | 
						|
    import thread, threading
 | 
						|
except ImportError:
 | 
						|
    thread = None
 | 
						|
 | 
						|
def test_simple():
 | 
						|
    lst = []
 | 
						|
    def f():
 | 
						|
        lst.append(1)
 | 
						|
        greenlet.getcurrent().parent.switch()
 | 
						|
        lst.append(3)
 | 
						|
    g = greenlet(f)
 | 
						|
    lst.append(0)
 | 
						|
    g.switch()
 | 
						|
    lst.append(2)
 | 
						|
    g.switch()
 | 
						|
    lst.append(4)
 | 
						|
    assert lst == range(5)
 | 
						|
 | 
						|
def test_threads():
 | 
						|
    if not thread:
 | 
						|
        py.test.skip("this is a test about thread")
 | 
						|
    success = []
 | 
						|
    def f():
 | 
						|
        test_simple()
 | 
						|
        success.append(True)
 | 
						|
    ths = [threading.Thread(target=f) for i in range(10)]
 | 
						|
    for th in ths:
 | 
						|
        th.start()
 | 
						|
    for th in ths:
 | 
						|
        th.join()
 | 
						|
    assert len(success) == len(ths)
 | 
						|
 | 
						|
 | 
						|
class SomeError(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
def fmain(seen):
 | 
						|
    try:
 | 
						|
        greenlet.getcurrent().parent.switch()
 | 
						|
    except:
 | 
						|
        seen.append(sys.exc_info()[0])
 | 
						|
        raise
 | 
						|
    raise SomeError
 | 
						|
 | 
						|
def test_exception():
 | 
						|
    seen = []
 | 
						|
    g1 = greenlet(fmain)
 | 
						|
    g2 = greenlet(fmain)
 | 
						|
    g1.switch(seen)
 | 
						|
    g2.switch(seen)
 | 
						|
    g2.parent = g1
 | 
						|
    assert seen == []
 | 
						|
    raises(SomeError, g2.switch)
 | 
						|
    assert seen == [SomeError]
 | 
						|
    g2.switch()
 | 
						|
    assert seen == [SomeError]
 | 
						|
 | 
						|
def send_exception(g, exc):
 | 
						|
    # note: send_exception(g, exc)  can be now done with  g.throw(exc).
 | 
						|
    # the purpose of this test is to explicitely check the propagation rules.
 | 
						|
    def crasher(exc):
 | 
						|
        raise exc
 | 
						|
    g1 = greenlet(crasher, parent=g)
 | 
						|
    g1.switch(exc)
 | 
						|
 | 
						|
def test_send_exception():
 | 
						|
    seen = []
 | 
						|
    g1 = greenlet(fmain)
 | 
						|
    g1.switch(seen)
 | 
						|
    raises(KeyError, "send_exception(g1, KeyError)")
 | 
						|
    assert seen == [KeyError]
 | 
						|
 | 
						|
def test_dealloc():
 | 
						|
    seen = []
 | 
						|
    g1 = greenlet(fmain)
 | 
						|
    g2 = greenlet(fmain)
 | 
						|
    g1.switch(seen)
 | 
						|
    g2.switch(seen)
 | 
						|
    assert seen == []
 | 
						|
    del g1
 | 
						|
    gc.collect()
 | 
						|
    assert seen == [greenlet.GreenletExit]
 | 
						|
    del g2
 | 
						|
    gc.collect()
 | 
						|
    assert seen == [greenlet.GreenletExit, greenlet.GreenletExit]
 | 
						|
 | 
						|
def test_dealloc_other_thread():
 | 
						|
    if not thread:
 | 
						|
        py.test.skip("this is a test about thread")
 | 
						|
    seen = []
 | 
						|
    someref = []
 | 
						|
    lock = thread.allocate_lock()
 | 
						|
    lock.acquire()
 | 
						|
    lock2 = thread.allocate_lock()
 | 
						|
    lock2.acquire()
 | 
						|
    def f():
 | 
						|
        g1 = greenlet(fmain)
 | 
						|
        g1.switch(seen)
 | 
						|
        someref.append(g1)
 | 
						|
        del g1
 | 
						|
        gc.collect()
 | 
						|
        lock.release()
 | 
						|
        lock2.acquire()
 | 
						|
        greenlet()   # trigger release
 | 
						|
        lock.release()
 | 
						|
        lock2.acquire()
 | 
						|
    t = threading.Thread(target=f)
 | 
						|
    t.start()
 | 
						|
    lock.acquire()
 | 
						|
    assert seen == []
 | 
						|
    assert len(someref) == 1
 | 
						|
    del someref[:]
 | 
						|
    gc.collect()
 | 
						|
    # g1 is not released immediately because it's from another thread
 | 
						|
    assert seen == []
 | 
						|
    lock2.release()
 | 
						|
    lock.acquire()
 | 
						|
    assert seen == [greenlet.GreenletExit]
 | 
						|
    lock2.release()
 | 
						|
    t.join()
 | 
						|
 | 
						|
def test_frame():
 | 
						|
    def f1():
 | 
						|
        f = sys._getframe(0)
 | 
						|
	assert f.f_back is None
 | 
						|
	greenlet.getcurrent().parent.switch(f)
 | 
						|
	return "meaning of life"
 | 
						|
    g = greenlet(f1)
 | 
						|
    frame = g.switch()
 | 
						|
    assert frame is g.gr_frame
 | 
						|
    assert g
 | 
						|
    next = g.switch()
 | 
						|
    assert not g
 | 
						|
    assert next == "meaning of life"
 | 
						|
    assert g.gr_frame is None
 | 
						|
 | 
						|
def test_thread_bug():
 | 
						|
    if not thread:
 | 
						|
        py.test.skip("this is a test about thread")
 | 
						|
    import time
 | 
						|
    def runner(x):
 | 
						|
        g = greenlet(lambda: time.sleep(x))
 | 
						|
        g.switch()
 | 
						|
    t1 = threading.Thread(target=runner, args=(0.2,))
 | 
						|
    t2 = threading.Thread(target=runner, args=(0.3,))
 | 
						|
    t1.start()
 | 
						|
    t2.start()
 | 
						|
    t1.join()
 | 
						|
    t2.join()
 |