73 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			73 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
| 
 | |
| import py
 | |
| import sys
 | |
| 
 | |
| WorkerPool = py._thread.WorkerPool
 | |
| ThreadOut = py._thread.ThreadOut
 | |
| 
 | |
| def test_threadout_install_deinstall():
 | |
|     old = sys.stdout
 | |
|     out = ThreadOut(sys, 'stdout')
 | |
|     out.deinstall()
 | |
|     assert old == sys.stdout
 | |
| 
 | |
| class TestThreadOut:
 | |
|     def test_threadout_one(self):
 | |
|         out = ThreadOut(sys, 'stdout')
 | |
|         try:
 | |
|             l = []
 | |
|             out.setwritefunc(l.append)
 | |
|             print 42,13,
 | |
|             x = l.pop(0)
 | |
|             assert x == '42'
 | |
|             x = l.pop(0)
 | |
|             assert x == ' '
 | |
|             x = l.pop(0)
 | |
|             assert x == '13'
 | |
|         finally:
 | |
|             out.deinstall()
 | |
| 
 | |
|     def test_threadout_multi_and_default(self):
 | |
|         out = ThreadOut(sys, 'stdout')
 | |
|         try:
 | |
|             num = 3
 | |
|             defaults = []
 | |
|             def f(l):
 | |
|                 out.setwritefunc(l.append)
 | |
|                 print id(l),
 | |
|                 out.delwritefunc()
 | |
|                 print 1
 | |
|             out.setdefaultwriter(defaults.append)
 | |
|             pool = WorkerPool()
 | |
|             listlist = []
 | |
|             for x in range(num):
 | |
|                 l = []
 | |
|                 listlist.append(l)
 | |
|                 pool.dispatch(f, l)
 | |
|             pool.shutdown()
 | |
|             for name, value in out.__dict__.items():
 | |
|                 print >>sys.stderr, "%s: %s" %(name, value)
 | |
|             pool.join(2.0)
 | |
|             for i in range(num):
 | |
|                 item = listlist[i]
 | |
|                 assert item ==[str(id(item))]
 | |
|             assert not out._tid2out
 | |
|             assert defaults
 | |
|             expect = ['1' for x in range(num)]
 | |
|             defaults = [x for x in defaults if x.strip()]
 | |
|             assert defaults == expect
 | |
|         finally:
 | |
|             out.deinstall()
 | |
| 
 | |
|     def test_threadout_nested(self):
 | |
|         out1 = ThreadOut(sys, 'stdout')
 | |
|         try:
 | |
|             # we want ThreadOuts to coexist
 | |
|             last = sys.stdout
 | |
|             out = ThreadOut(sys, 'stdout')
 | |
|             assert last == sys.stdout
 | |
|             out.deinstall()
 | |
|             assert last == sys.stdout
 | |
|         finally:
 | |
|             out1.deinstall()
 |