From 62634136b9f1868ab1472ad10d0ce657d7a24692 Mon Sep 17 00:00:00 2001 From: hpk Date: Wed, 8 Aug 2007 13:45:04 +0200 Subject: [PATCH] [svn r45548] - refactoring cleanup mechanics into its own class - have setDaemon(1) for the receiverThread as otherwise on python2.5 atexit will not be invoked (the receiver thread apparently blocks it) --HG-- branch : trunk --- py/execnet/gateway.py | 48 ++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py index b2beae271..417d92359 100644 --- a/py/execnet/gateway.py +++ b/py/execnet/gateway.py @@ -28,30 +28,55 @@ debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa') sysex = (KeyboardInterrupt, SystemExit) +# ---------------------------------------------------------- +# cleanup machinery (for exiting processes) +# ---------------------------------------------------------- + +class GatewayCleanup: + def __init__(self): + self._activegateways = weakref.WeakKeyDictionary() + atexit.register(self.cleanup_atexit) + + def register(self, gateway): + assert gateway not in self._activegateways + self._activegateways[gateway] = True + + def unregister(self, gateway): + del self._activegateways[gateway] + + def cleanup_atexit(self): + if debug: + print >>debug, "="*20 + "cleaning up" + "=" * 20 + debug.flush() + for gw in self._activegateways.keys(): + gw.exit() + gw.join() # should work as well + +# ---------------------------------------------------------- +# Base Gateway (used for both remote and local side) +# ---------------------------------------------------------- + class Gateway(object): class _StopExecLoop(Exception): pass _ThreadOut = ThreadOut remoteaddress = "" _requestqueue = None + _cleanup = GatewayCleanup() def __init__(self, io, _startcount=2): """ initialize core gateway, using the given inputoutput object. """ - global registered_cleanup, _activegateways self._io = io self._channelfactory = ChannelFactory(self, _startcount) - if not registered_cleanup: - atexit.register(cleanup_atexit) - registered_cleanup = True - _activegateways[self] = True + self._cleanup.register(self) def _initreceive(self, requestqueue=False): if requestqueue: self._requestqueue = Queue.Queue() self._receiverthread = threading.Thread(name="receiver", target=self._thread_receiver) - self._receiverthread.setDaemon(0) + self._receiverthread.setDaemon(1) self._receiverthread.start() def __repr__(self): @@ -296,6 +321,7 @@ class Gateway(object): def exit(self): """ Try to stop all exec and IO activity. """ + self._cleanup.unregister(self) self._stopexec() self._stopsend() @@ -323,13 +349,3 @@ def getid(gw, cache={}): cache[name][id(gw)] = x = "%s:%s.%d" %(os.getpid(), gw.__class__.__name__, len(cache[name])) return x -registered_cleanup = False -_activegateways = weakref.WeakKeyDictionary() -def cleanup_atexit(): - if debug: - print >>debug, "="*20 + "cleaning up" + "=" * 20 - debug.flush() - while _activegateways: - gw, ignored = _activegateways.popitem() - gw.exit() - #gw.join() should work as well