207 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			207 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
import Queue 
 | 
						|
import threading
 | 
						|
import time 
 | 
						|
import sys
 | 
						|
 | 
						|
ERRORMARKER = object() 
 | 
						|
 | 
						|
class Reply(object): 
 | 
						|
    """ reply instances provide access to the result
 | 
						|
        of a function execution that got dispatched
 | 
						|
        through WorkerPool.dispatch() 
 | 
						|
    """
 | 
						|
    _excinfo = None 
 | 
						|
    def __init__(self, task): 
 | 
						|
        self.task = task 
 | 
						|
        self._queue = Queue.Queue() 
 | 
						|
 | 
						|
    def _set(self, result): 
 | 
						|
        self._queue.put(result) 
 | 
						|
 | 
						|
    def _setexcinfo(self, excinfo): 
 | 
						|
        self._excinfo = excinfo 
 | 
						|
        self._queue.put(ERRORMARKER) 
 | 
						|
 | 
						|
    def _get_with_timeout(self, timeout): 
 | 
						|
        # taken from python2.3's Queue.get() 
 | 
						|
        # we want to run on python2.2 here
 | 
						|
        delay = 0.0005 # 500 us -> initial delay of 1 ms
 | 
						|
        endtime = time.time() + timeout
 | 
						|
        while 1:
 | 
						|
            try: 
 | 
						|
                return self._queue.get_nowait() 
 | 
						|
            except Queue.Empty: 
 | 
						|
                remaining = endtime - time.time() 
 | 
						|
                if remaining <= 0:  #time is over and no element arrived
 | 
						|
                    raise IOError("timeout waiting for task %r" %(self.task,))
 | 
						|
                delay = min(delay * 2, remaining, .05)
 | 
						|
                time.sleep(delay)       #reduce CPU usage by using a sleep
 | 
						|
 | 
						|
    def get(self, timeout=None): 
 | 
						|
        """ get the result object from an asynchronous function execution.
 | 
						|
            if the function execution raised an exception, 
 | 
						|
            then calling get() will reraise that exception
 | 
						|
            including its traceback. 
 | 
						|
        """
 | 
						|
        if self._queue is None: 
 | 
						|
            raise EOFError("reply has already been delivered")
 | 
						|
        if timeout is not None: 
 | 
						|
            result = self._get_with_timeout(timeout) 
 | 
						|
        else: 
 | 
						|
            result = self._queue.get() 
 | 
						|
        if result is ERRORMARKER: 
 | 
						|
            self._queue = None
 | 
						|
            excinfo = self._excinfo 
 | 
						|
            raise excinfo[0], excinfo[1], excinfo[2]
 | 
						|
        return result 
 | 
						|
 | 
						|
class WorkerThread(threading.Thread): 
 | 
						|
    def __init__(self, pool): 
 | 
						|
        threading.Thread.__init__(self) 
 | 
						|
        self._queue = Queue.Queue() 
 | 
						|
        self._pool = pool 
 | 
						|
        self.setDaemon(1) 
 | 
						|
 | 
						|
    def _run_once(self):
 | 
						|
        reply = self._queue.get()
 | 
						|
        if reply is SystemExit:
 | 
						|
            return False
 | 
						|
        assert self not in self._pool._ready 
 | 
						|
        task = reply.task 
 | 
						|
        try: 
 | 
						|
            func, args, kwargs = task 
 | 
						|
            result = func(*args, **kwargs) 
 | 
						|
        except (SystemExit, KeyboardInterrupt): 
 | 
						|
            return False
 | 
						|
        except: 
 | 
						|
            reply._setexcinfo(sys.exc_info()) 
 | 
						|
        else: 
 | 
						|
            reply._set(result) 
 | 
						|
        # at this point, reply, task and all other local variables go away
 | 
						|
        return True
 | 
						|
 | 
						|
    def run(self): 
 | 
						|
        try: 
 | 
						|
            while self._run_once():
 | 
						|
                self._pool._ready[self] = True 
 | 
						|
        finally: 
 | 
						|
            del self._pool._alive[self]
 | 
						|
            try: 
 | 
						|
                del self._pool._ready[self]
 | 
						|
            except KeyError: 
 | 
						|
                pass
 | 
						|
 | 
						|
    def send(self, task): 
 | 
						|
        reply = Reply(task) 
 | 
						|
        self._queue.put(reply) 
 | 
						|
        return reply 
 | 
						|
 | 
						|
    def stop(self): 
 | 
						|
        self._queue.put(SystemExit)   
 | 
						|
        
 | 
						|
class WorkerPool(object): 
 | 
						|
    """ A WorkerPool allows to dispatch function executions
 | 
						|
        to threads.  Each Worker Thread is reused for multiple
 | 
						|
        function executions. The dispatching operation 
 | 
						|
        takes care to create and dispatch to existing 
 | 
						|
        threads. 
 | 
						|
 | 
						|
        You need to call shutdown() to signal 
 | 
						|
        the WorkerThreads to terminate and join() 
 | 
						|
        in order to wait until all worker threads
 | 
						|
        have terminated. 
 | 
						|
    """
 | 
						|
    _shuttingdown = False 
 | 
						|
    def __init__(self, maxthreads=None): 
 | 
						|
        """ init WorkerPool instance which may
 | 
						|
            create up to `maxthreads` worker threads. 
 | 
						|
        """
 | 
						|
        self.maxthreads = maxthreads
 | 
						|
        self._ready = {}
 | 
						|
        self._alive = {}
 | 
						|
 | 
						|
    def dispatch(self, func, *args, **kwargs): 
 | 
						|
        """ return Reply object for the asynchronous dispatch 
 | 
						|
            of the given func(*args, **kwargs) in a 
 | 
						|
            separate worker thread. 
 | 
						|
        """
 | 
						|
        if self._shuttingdown: 
 | 
						|
            raise IOError("WorkerPool is already shutting down") 
 | 
						|
        try: 
 | 
						|
            thread, _ = self._ready.popitem() 
 | 
						|
        except KeyError: # pop from empty list
 | 
						|
            if self.maxthreads and len(self._alive) >= self.maxthreads: 
 | 
						|
                raise IOError("can't create more than %d threads." %
 | 
						|
                              (self.maxthreads,))
 | 
						|
            thread = self._newthread() 
 | 
						|
        return thread.send((func, args, kwargs))
 | 
						|
 | 
						|
    def _newthread(self): 
 | 
						|
        thread = WorkerThread(self) 
 | 
						|
        self._alive[thread] = True 
 | 
						|
        thread.start() 
 | 
						|
        return thread 
 | 
						|
 | 
						|
    def shutdown(self): 
 | 
						|
        """ signal all worker threads to terminate. 
 | 
						|
            call join() to wait until all threads termination. 
 | 
						|
        """
 | 
						|
        if not self._shuttingdown: 
 | 
						|
            self._shuttingdown = True 
 | 
						|
            for t in self._alive.keys(): 
 | 
						|
                t.stop() 
 | 
						|
 | 
						|
    def join(self, timeout=None): 
 | 
						|
        """ wait until all worker threads have terminated. """
 | 
						|
        current = threading.currentThread()
 | 
						|
        deadline = delta = None 
 | 
						|
        if timeout is not None: 
 | 
						|
            deadline = time.time() + timeout 
 | 
						|
        for thread in self._alive.keys(): 
 | 
						|
            if deadline: 
 | 
						|
                delta = deadline - time.time() 
 | 
						|
                if delta <= 0: 
 | 
						|
                    raise IOError("timeout while joining threads") 
 | 
						|
            thread.join(timeout=delta) 
 | 
						|
            if thread.isAlive(): 
 | 
						|
                raise IOError("timeout while joining threads") 
 | 
						|
 | 
						|
class NamedThreadPool: 
 | 
						|
    def __init__(self, **kw): 
 | 
						|
        self._namedthreads = {}
 | 
						|
        for name, value in kw.items(): 
 | 
						|
            self.start(name, value) 
 | 
						|
 | 
						|
    def __repr__(self): 
 | 
						|
        return "<NamedThreadPool %r>" %(self._namedthreads) 
 | 
						|
 | 
						|
    def get(self, name=None): 
 | 
						|
        if name is None: 
 | 
						|
            l = []
 | 
						|
            for x in self._namedthreads.values(): 
 | 
						|
                l.extend(x) 
 | 
						|
            return l 
 | 
						|
        else: 
 | 
						|
            return self._namedthreads.get(name, [])
 | 
						|
 | 
						|
    def getstarted(self, name=None): 
 | 
						|
        return [t for t in self.get(name) if t.isAlive()]
 | 
						|
 | 
						|
    def prunestopped(self, name=None): 
 | 
						|
        if name is None: 
 | 
						|
            for name in self.names(): 
 | 
						|
                self.prunestopped(name) 
 | 
						|
        else: 
 | 
						|
            self._namedthreads[name] = self.getstarted(name) 
 | 
						|
 | 
						|
    def names(self): 
 | 
						|
        return self._namedthreads.keys() 
 | 
						|
 | 
						|
    def start(self, name, func): 
 | 
						|
        l = self._namedthreads.setdefault(name, []) 
 | 
						|
        thread = threading.Thread(name="%s%d" % (name, len(l)), 
 | 
						|
                                  target=func) 
 | 
						|
        thread.start() 
 | 
						|
        l.append(thread) 
 | 
						|
 |