[svn r45548] - refactoring cleanup mechanics into its own class
- have setDaemon(1) for the receiverThread as otherwise on python2.5 atexit will not be invoked (the receiver thread apparently blocks it) --HG-- branch : trunk
This commit is contained in:
parent
3cb7d3579a
commit
62634136b9
|
@ -28,30 +28,55 @@ debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
|
||||||
|
|
||||||
sysex = (KeyboardInterrupt, SystemExit)
|
sysex = (KeyboardInterrupt, SystemExit)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------
|
||||||
|
# cleanup machinery (for exiting processes)
|
||||||
|
# ----------------------------------------------------------
|
||||||
|
|
||||||
|
class GatewayCleanup:
|
||||||
|
def __init__(self):
|
||||||
|
self._activegateways = weakref.WeakKeyDictionary()
|
||||||
|
atexit.register(self.cleanup_atexit)
|
||||||
|
|
||||||
|
def register(self, gateway):
|
||||||
|
assert gateway not in self._activegateways
|
||||||
|
self._activegateways[gateway] = True
|
||||||
|
|
||||||
|
def unregister(self, gateway):
|
||||||
|
del self._activegateways[gateway]
|
||||||
|
|
||||||
|
def cleanup_atexit(self):
|
||||||
|
if debug:
|
||||||
|
print >>debug, "="*20 + "cleaning up" + "=" * 20
|
||||||
|
debug.flush()
|
||||||
|
for gw in self._activegateways.keys():
|
||||||
|
gw.exit()
|
||||||
|
gw.join() # should work as well
|
||||||
|
|
||||||
|
# ----------------------------------------------------------
|
||||||
|
# Base Gateway (used for both remote and local side)
|
||||||
|
# ----------------------------------------------------------
|
||||||
|
|
||||||
class Gateway(object):
|
class Gateway(object):
|
||||||
class _StopExecLoop(Exception): pass
|
class _StopExecLoop(Exception): pass
|
||||||
_ThreadOut = ThreadOut
|
_ThreadOut = ThreadOut
|
||||||
remoteaddress = ""
|
remoteaddress = ""
|
||||||
_requestqueue = None
|
_requestqueue = None
|
||||||
|
_cleanup = GatewayCleanup()
|
||||||
|
|
||||||
def __init__(self, io, _startcount=2):
|
def __init__(self, io, _startcount=2):
|
||||||
""" initialize core gateway, using the given
|
""" initialize core gateway, using the given
|
||||||
inputoutput object.
|
inputoutput object.
|
||||||
"""
|
"""
|
||||||
global registered_cleanup, _activegateways
|
|
||||||
self._io = io
|
self._io = io
|
||||||
self._channelfactory = ChannelFactory(self, _startcount)
|
self._channelfactory = ChannelFactory(self, _startcount)
|
||||||
if not registered_cleanup:
|
self._cleanup.register(self)
|
||||||
atexit.register(cleanup_atexit)
|
|
||||||
registered_cleanup = True
|
|
||||||
_activegateways[self] = True
|
|
||||||
|
|
||||||
def _initreceive(self, requestqueue=False):
|
def _initreceive(self, requestqueue=False):
|
||||||
if requestqueue:
|
if requestqueue:
|
||||||
self._requestqueue = Queue.Queue()
|
self._requestqueue = Queue.Queue()
|
||||||
self._receiverthread = threading.Thread(name="receiver",
|
self._receiverthread = threading.Thread(name="receiver",
|
||||||
target=self._thread_receiver)
|
target=self._thread_receiver)
|
||||||
self._receiverthread.setDaemon(0)
|
self._receiverthread.setDaemon(1)
|
||||||
self._receiverthread.start()
|
self._receiverthread.start()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
@ -296,6 +321,7 @@ class Gateway(object):
|
||||||
|
|
||||||
def exit(self):
|
def exit(self):
|
||||||
""" Try to stop all exec and IO activity. """
|
""" Try to stop all exec and IO activity. """
|
||||||
|
self._cleanup.unregister(self)
|
||||||
self._stopexec()
|
self._stopexec()
|
||||||
self._stopsend()
|
self._stopsend()
|
||||||
|
|
||||||
|
@ -323,13 +349,3 @@ def getid(gw, cache={}):
|
||||||
cache[name][id(gw)] = x = "%s:%s.%d" %(os.getpid(), gw.__class__.__name__, len(cache[name]))
|
cache[name][id(gw)] = x = "%s:%s.%d" %(os.getpid(), gw.__class__.__name__, len(cache[name]))
|
||||||
return x
|
return x
|
||||||
|
|
||||||
registered_cleanup = False
|
|
||||||
_activegateways = weakref.WeakKeyDictionary()
|
|
||||||
def cleanup_atexit():
|
|
||||||
if debug:
|
|
||||||
print >>debug, "="*20 + "cleaning up" + "=" * 20
|
|
||||||
debug.flush()
|
|
||||||
while _activegateways:
|
|
||||||
gw, ignored = _activegateways.popitem()
|
|
||||||
gw.exit()
|
|
||||||
#gw.join() should work as well
|
|
||||||
|
|
Loading…
Reference in New Issue