218 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
 | 
						|
""" This is an implementation of an execnet protocol on top
 | 
						|
of a transport layer provided by the greensock2 interface.
 | 
						|
 | 
						|
It has the same semantics, but does not use threads at all
 | 
						|
(which makes it suitable for specific enviroments, like pypy-c).
 | 
						|
 | 
						|
There are some features lacking, most notable:
 | 
						|
- callback support for channels
 | 
						|
- socket gateway
 | 
						|
- bootstrapping (there is assumption of pylib being available
 | 
						|
  on remote side, which is not always true)
 | 
						|
"""
 | 
						|
 | 
						|
import sys, os, py, inspect
 | 
						|
from pygreen import greensock2
 | 
						|
from pygreen.msgstruct import message, decodemessage
 | 
						|
 | 
						|
MSG_REMOTE_EXEC = 'r'
 | 
						|
MSG_OBJECT      = 'o'
 | 
						|
MSG_ERROR       = 'e'
 | 
						|
MSG_CHAN_CLOSE  = 'c'
 | 
						|
MSG_FATAL       = 'f'
 | 
						|
MSG_CHANNEL  = 'n'
 | 
						|
 | 
						|
class Gateway(object):
 | 
						|
 | 
						|
    def __init__(self, input, output, is_remote=False):
 | 
						|
        self.input  = input
 | 
						|
        self.output = output
 | 
						|
        self.nextchannum = int(is_remote)
 | 
						|
        self.receivers = {}
 | 
						|
        self.greenlet = greensock2.autogreenlet(self.serve_forever, is_remote)
 | 
						|
 | 
						|
    def remote_exec(self, remote_source):
 | 
						|
        remote_source = py.code.Source(remote_source)
 | 
						|
        chan = self.newchannel()
 | 
						|
        msg = message(MSG_REMOTE_EXEC, chan.n, str(remote_source))
 | 
						|
        self.output.sendall(msg)
 | 
						|
        return chan
 | 
						|
 | 
						|
    def newchannel(self):
 | 
						|
        n = self.nextchannum
 | 
						|
        self.nextchannum += 2
 | 
						|
        return self.make_channel(n)
 | 
						|
 | 
						|
    def make_channel(self, n):
 | 
						|
        giver, accepter = greensock2.meetingpoint()
 | 
						|
        assert n not in self.receivers
 | 
						|
        self.receivers[n] = giver
 | 
						|
        return Channel(self, n, accepter)
 | 
						|
 | 
						|
    def serve_forever(self, is_remote=False):
 | 
						|
        try:
 | 
						|
            buffer = ""
 | 
						|
            while 1:
 | 
						|
                msg, buffer = decodemessage(buffer)
 | 
						|
                if msg is None:
 | 
						|
                    buffer += self.input.recv(16384)
 | 
						|
                else:
 | 
						|
                    handler = HANDLERS[msg[0]]
 | 
						|
                    handler(self, *msg[1:])
 | 
						|
        except greensock2.greenlet.GreenletExit:
 | 
						|
            raise
 | 
						|
        except:
 | 
						|
            if is_remote:
 | 
						|
                msg = message(MSG_FATAL, format_error(*sys.exc_info()))
 | 
						|
                self.output.sendall(msg)
 | 
						|
            else:
 | 
						|
                raise
 | 
						|
 | 
						|
    def msg_remote_exec(self, n, source):
 | 
						|
        def do_execute(channel):
 | 
						|
            try:
 | 
						|
                d = {'channel': channel}
 | 
						|
                exec source in d
 | 
						|
            except:
 | 
						|
                channel.report_error(*sys.exc_info())
 | 
						|
            else:
 | 
						|
                channel.close()
 | 
						|
        greensock2.autogreenlet(do_execute, self.make_channel(n))
 | 
						|
 | 
						|
    def msg_object(self, n, objrepr):
 | 
						|
        obj = eval(objrepr)
 | 
						|
        if n in self.receivers:
 | 
						|
            self.receivers[n].give_queued(obj)
 | 
						|
 | 
						|
    def msg_error(self, n, s):
 | 
						|
        if n in self.receivers:
 | 
						|
            self.receivers[n].give_queued(RemoteError(s))
 | 
						|
            self.receivers[n].close()
 | 
						|
            del self.receivers[n]
 | 
						|
 | 
						|
    def msg_chan_close(self, n):
 | 
						|
        if n in self.receivers:
 | 
						|
            self.receivers[n].close()
 | 
						|
            del self.receivers[n]
 | 
						|
 | 
						|
    def msg_channel(self, n, m):
 | 
						|
        if n in self.receivers:
 | 
						|
            self.receivers[n].give_queued(self.make_channel(m))
 | 
						|
 | 
						|
    def msg_fatal(self, s):
 | 
						|
        raise RemoteError(s)
 | 
						|
 | 
						|
HANDLERS = {
 | 
						|
    MSG_REMOTE_EXEC: Gateway.msg_remote_exec,
 | 
						|
    MSG_OBJECT:      Gateway.msg_object,
 | 
						|
    MSG_ERROR:       Gateway.msg_error,
 | 
						|
    MSG_CHAN_CLOSE:  Gateway.msg_chan_close,
 | 
						|
    MSG_CHANNEL:     Gateway.msg_channel,
 | 
						|
    MSG_FATAL:       Gateway.msg_fatal,
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
class Channel(object):
 | 
						|
 | 
						|
    def __init__(self, gw, n, accepter):
 | 
						|
        self.gw = gw
 | 
						|
        self.n = n
 | 
						|
        self.accepter = accepter
 | 
						|
 | 
						|
    def send(self, obj):
 | 
						|
        if isinstance(obj, Channel):
 | 
						|
            assert obj.gw is self.gw
 | 
						|
            msg = message(MSG_CHANNEL, self.n, obj.n)
 | 
						|
        else:
 | 
						|
            msg = message(MSG_OBJECT, self.n, repr(obj))
 | 
						|
        self.gw.output.sendall(msg)
 | 
						|
 | 
						|
    def receive(self):
 | 
						|
        obj = self.accepter.accept()
 | 
						|
        if isinstance(obj, RemoteError):
 | 
						|
            raise obj
 | 
						|
        else:
 | 
						|
            return obj
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        try:
 | 
						|
            self.gw.output.sendall(message(MSG_CHAN_CLOSE, self.n))
 | 
						|
        except OSError:
 | 
						|
            pass
 | 
						|
 | 
						|
    def report_error(self, exc_type, exc_value, exc_traceback=None):
 | 
						|
        s = format_error(exc_type, exc_value, exc_traceback)
 | 
						|
        try:
 | 
						|
            self.gw.output.sendall(message(MSG_ERROR, self.n, s))
 | 
						|
        except OSError:
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
class RemoteError(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
def format_error(exc_type, exc_value, exc_traceback=None):
 | 
						|
    import traceback, StringIO
 | 
						|
    s = StringIO.StringIO()
 | 
						|
    traceback.print_exception(exc_type, exc_value, exc_traceback, file=s)
 | 
						|
    return s.getvalue()
 | 
						|
 | 
						|
 | 
						|
class PopenCmdGateway(Gateway):
 | 
						|
    action = "exec input()"
 | 
						|
 | 
						|
    def __init__(self, cmdline):
 | 
						|
        from pygreen.pipe.fd import FDInput, FDOutput
 | 
						|
        child_in, child_out = os.popen2(cmdline, 't', 0)
 | 
						|
        fdin  = FDInput(child_out.fileno(), child_out.close)
 | 
						|
        fdout = FDOutput(child_in.fileno(), child_in.close)
 | 
						|
        fdout.sendall(self.get_bootstrap_code())
 | 
						|
        super(PopenCmdGateway, self).__init__(input = fdin, output = fdout)
 | 
						|
 | 
						|
    def get_bootstrap_code():
 | 
						|
        # XXX assumes that the py lib is installed on the remote side
 | 
						|
        src = []
 | 
						|
        src.append('from pygreen import greenexecnet')
 | 
						|
        src.append('greenexecnet.PopenCmdGateway.run_server()')
 | 
						|
        src.append('')
 | 
						|
        return '%r\n' % ('\n'.join(src),)
 | 
						|
    get_bootstrap_code = staticmethod(get_bootstrap_code)
 | 
						|
 | 
						|
    def run_server():
 | 
						|
        from pygreen.pipe.fd import FDInput, FDOutput
 | 
						|
        gw = Gateway(input = FDInput(os.dup(0)),
 | 
						|
                     output = FDOutput(os.dup(1)),
 | 
						|
                     is_remote = True)
 | 
						|
        # for now, ignore normal I/O
 | 
						|
        fd = os.open('/dev/null', os.O_RDWR)
 | 
						|
        os.dup2(fd, 0)
 | 
						|
        os.dup2(fd, 1)
 | 
						|
        os.close(fd)
 | 
						|
        greensock2._suspend_forever()
 | 
						|
    run_server = staticmethod(run_server)
 | 
						|
 | 
						|
class PopenGateway(PopenCmdGateway):
 | 
						|
    def __init__(self, python=sys.executable):
 | 
						|
        cmdline = '"%s" -u -c "%s"' % (python, self.action)
 | 
						|
        super(PopenGateway, self).__init__(cmdline)
 | 
						|
 | 
						|
class SshGateway(PopenCmdGateway):
 | 
						|
    def __init__(self, sshaddress, remotepython='python', identity=None):
 | 
						|
        self.sshaddress = sshaddress
 | 
						|
        remotecmd = '%s -u -c "%s"' % (remotepython, self.action)
 | 
						|
        cmdline = [sshaddress, remotecmd]
 | 
						|
        # XXX Unix style quoting
 | 
						|
        for i in range(len(cmdline)):
 | 
						|
            cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
 | 
						|
        cmd = 'ssh -C'
 | 
						|
        if identity is not None: 
 | 
						|
            cmd += ' -i %s' % (identity,)
 | 
						|
        cmdline.insert(0, cmd) 
 | 
						|
        super(SshGateway, self).__init__(' '.join(cmdline))
 | 
						|
 | 
						|
 | 
						|
##f = open('LOG', 'a')
 | 
						|
##import os; print >> f, '[%d] READY' % (os.getpid(),)
 | 
						|
##f.close()
 |