438 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			438 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
 | 
						|
""" web server for py.test
 | 
						|
"""
 | 
						|
 | 
						|
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
 | 
						|
 | 
						|
import thread, threading
 | 
						|
import re
 | 
						|
import time
 | 
						|
import random
 | 
						|
import Queue
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import socket
 | 
						|
 | 
						|
import py
 | 
						|
from py.__.test.rsession.rsession import RSession
 | 
						|
from py.__.test.rsession import repevent
 | 
						|
from py.__.test import collect
 | 
						|
from py.__.test.rsession.webdata import json
 | 
						|
 | 
						|
DATADIR = py.path.local(__file__).dirpath("webdata")
 | 
						|
FUNCTION_LIST = ["main", "show_skip", "show_traceback", "show_info", "hide_info",
 | 
						|
    "show_host", "hide_host", "hide_messagebox", "opt_scroll"]
 | 
						|
 | 
						|
try:
 | 
						|
    from pypy.rpython.ootypesystem.bltregistry import MethodDesc, BasicExternal,\
 | 
						|
                                                      described
 | 
						|
    from pypy.translator.js.main import rpython2javascript
 | 
						|
    from pypy.translator.js import commproxy
 | 
						|
    from pypy.rpython.extfunc import _callable
 | 
						|
 | 
						|
    commproxy.USE_MOCHIKIT = False
 | 
						|
    IMPORTED_PYPY = True
 | 
						|
except (ImportError, NameError):
 | 
						|
    class BasicExternal(object):
 | 
						|
        pass
 | 
						|
 | 
						|
    def described(*args, **kwargs):
 | 
						|
        def decorator(func):
 | 
						|
            return func
 | 
						|
        return decorator
 | 
						|
 | 
						|
    def _callable(*args, **kwargs):
 | 
						|
        pass
 | 
						|
 | 
						|
    IMPORTED_PYPY = False
 | 
						|
 | 
						|
def add_item(event):
 | 
						|
    """ A little helper
 | 
						|
    """
 | 
						|
    item = event.item
 | 
						|
    itemtype = item.__class__.__name__
 | 
						|
    itemname = item.name
 | 
						|
    fullitemname = "/".join(item.listnames())
 | 
						|
    d = {'fullitemname': fullitemname, 'itemtype': itemtype,
 | 
						|
         'itemname': itemname}
 | 
						|
    #if itemtype == 'Module':
 | 
						|
    try:
 | 
						|
        d['length'] = str(len(list(event.item._tryiter())))
 | 
						|
    except:
 | 
						|
        d['length'] = "?"
 | 
						|
    return d
 | 
						|
 | 
						|
class MultiQueue(object):
 | 
						|
    """ a tailor-made queue (internally using Queue) for py.test.rsession.web
 | 
						|
 | 
						|
        API-wise the main difference is that the get() method gets a sessid
 | 
						|
        argument, which is used to determine what data to feed to the client
 | 
						|
 | 
						|
        when a data queue for a sessid doesn't yet exist, it is created, and
 | 
						|
        filled with data that has already been fed to the other clients
 | 
						|
    """
 | 
						|
    def __init__(self):
 | 
						|
        self._cache = []
 | 
						|
        self._session_queues = {}
 | 
						|
        self._lock = py.std.thread.allocate_lock()
 | 
						|
 | 
						|
    def put(self, item):
 | 
						|
        self._lock.acquire()
 | 
						|
        try:
 | 
						|
            self._cache.append(item)
 | 
						|
            for key, q in self._session_queues.items():
 | 
						|
                q.put(item)
 | 
						|
        finally:
 | 
						|
            self._lock.release()
 | 
						|
    
 | 
						|
    def _del(self, sessid):
 | 
						|
        self._lock.acquire()
 | 
						|
        try:
 | 
						|
            del self._session_queues[sessid]
 | 
						|
        finally:
 | 
						|
            self._lock.release()
 | 
						|
 | 
						|
    def get(self, sessid):
 | 
						|
        self._lock.acquire()
 | 
						|
        try:
 | 
						|
            if not sessid in self._session_queues:
 | 
						|
                self._create_session_queue(sessid)
 | 
						|
        finally:
 | 
						|
            self._lock.release()
 | 
						|
        return self._session_queues[sessid].get(sessid)
 | 
						|
 | 
						|
    def empty(self):
 | 
						|
        self._lock.acquire()
 | 
						|
        try:
 | 
						|
            if not self._session_queues:
 | 
						|
                return not len(self._cache)
 | 
						|
            for q in self._session_queues.values():
 | 
						|
                if not q.empty():
 | 
						|
                    return False
 | 
						|
        finally:
 | 
						|
            self._lock.release()
 | 
						|
        return True
 | 
						|
 | 
						|
    def empty_queue(self, sessid):
 | 
						|
        return self._session_queues[sessid].empty()
 | 
						|
 | 
						|
    def _create_session_queue(self, sessid):
 | 
						|
        self._session_queues[sessid] = q = Queue.Queue()
 | 
						|
        for item in self._cache:
 | 
						|
            q.put(item)
 | 
						|
 | 
						|
class ExportedMethods(BasicExternal):
 | 
						|
    _render_xmlhttp = True
 | 
						|
    def __init__(self):
 | 
						|
        self.pending_events = MultiQueue()
 | 
						|
        self.start_event = threading.Event()
 | 
						|
        self.end_event = threading.Event()
 | 
						|
        self.skip_reasons = {}
 | 
						|
        self.fail_reasons = {}
 | 
						|
        self.stdout = {}
 | 
						|
        self.stderr = {}
 | 
						|
        self.all = 0
 | 
						|
        self.to_rsync = {}
 | 
						|
    
 | 
						|
    def findmodule(self, item):
 | 
						|
        # find the most outwards parent which is module
 | 
						|
        current = item
 | 
						|
        while current:
 | 
						|
            if isinstance(current, collect.Module):
 | 
						|
                break
 | 
						|
            current = current.parent
 | 
						|
        
 | 
						|
        if current is not None:
 | 
						|
            return str(current.name), str("/".join(current.listnames()))
 | 
						|
        else:
 | 
						|
            return str(item.parent.name), str("/".join(item.parent.listnames()))
 | 
						|
    
 | 
						|
    def show_hosts(self):
 | 
						|
        self.start_event.wait()
 | 
						|
        to_send = {}
 | 
						|
        for host in self.hosts:
 | 
						|
            to_send[host.hostid] = host.hostname
 | 
						|
        return to_send
 | 
						|
    show_hosts = described(retval={str:str}, args=[('callback',
 | 
						|
            _callable([{str:str}]))])(show_hosts)
 | 
						|
    
 | 
						|
    def show_skip(self, item_name="aa"):
 | 
						|
        return {'item_name': item_name,
 | 
						|
                           'reason': self.skip_reasons[item_name]}
 | 
						|
    show_skip = described(retval={str:str}, args=[('item_name',str),('callback',
 | 
						|
            _callable([{str:str}]))])(show_skip)
 | 
						|
    
 | 
						|
    def show_fail(self, item_name="aa"):
 | 
						|
        return {'item_name':item_name,
 | 
						|
                           'traceback':str(self.fail_reasons[item_name]),
 | 
						|
                           'stdout':self.stdout[item_name],
 | 
						|
                           'stderr':self.stderr[item_name]}
 | 
						|
    show_fail = described(retval={str:str}, args=[('item_name',str),('callback',
 | 
						|
            _callable([{str:str}]))])(show_fail)
 | 
						|
    
 | 
						|
    _sessids = None
 | 
						|
    _sesslock = py.std.thread.allocate_lock()
 | 
						|
    def show_sessid(self):
 | 
						|
        if not self._sessids:
 | 
						|
            self._sessids = []
 | 
						|
        self._sesslock.acquire()
 | 
						|
        try:
 | 
						|
            while 1:
 | 
						|
                sessid = ''.join(py.std.random.sample(
 | 
						|
                                 py.std.string.lowercase, 8))
 | 
						|
                if sessid not in self._sessids:
 | 
						|
                    self._sessids.append(sessid)
 | 
						|
                    break
 | 
						|
        finally:
 | 
						|
            self._sesslock.release()
 | 
						|
        return sessid
 | 
						|
    show_sessid = described(retval=str, args=[('callback',
 | 
						|
            _callable([str]))])(show_sessid)
 | 
						|
    
 | 
						|
    def failed(self, **kwargs):
 | 
						|
        if not 'sessid' in kwargs:
 | 
						|
            return
 | 
						|
        sessid = kwargs['sessid']
 | 
						|
        to_del = -1
 | 
						|
        for num, i in enumerate(self._sessids):
 | 
						|
            if i == sessid:
 | 
						|
                to_del = num
 | 
						|
        if to_del != -1:
 | 
						|
            del self._sessids[to_del]
 | 
						|
        self.pending_events._del(kwargs['sessid'])
 | 
						|
    
 | 
						|
    def show_all_statuses(self, sessid=-1):
 | 
						|
        retlist = [self.show_status_change(sessid)]
 | 
						|
        while not self.pending_events.empty_queue(sessid):
 | 
						|
            retlist.append(self.show_status_change(sessid))
 | 
						|
        retval = retlist
 | 
						|
        return retval
 | 
						|
    show_all_statuses = described(retval=[{str:str}],args=
 | 
						|
         [('sessid',str), ('callback',_callable([[{str:str}]]))])(show_all_statuses)
 | 
						|
        
 | 
						|
    def show_status_change(self, sessid):
 | 
						|
        event = self.pending_events.get(sessid)
 | 
						|
        if event is None:
 | 
						|
            self.end_event.set()
 | 
						|
            return {}
 | 
						|
        # some dispatcher here
 | 
						|
        if isinstance(event, repevent.ReceivedItemOutcome):
 | 
						|
            args = {}
 | 
						|
            outcome = event.outcome
 | 
						|
            for key, val in outcome.__dict__.iteritems():
 | 
						|
                args[key] = str(val)
 | 
						|
            args.update(add_item(event))
 | 
						|
            mod_name, mod_fullname = self.findmodule(event.item)
 | 
						|
            args['modulename'] = str(mod_name)
 | 
						|
            args['fullmodulename'] = str(mod_fullname)
 | 
						|
            fullitemname = args['fullitemname']
 | 
						|
            if outcome.skipped:
 | 
						|
                self.skip_reasons[fullitemname] = outcome.skipped
 | 
						|
            elif outcome.excinfo:
 | 
						|
                self.fail_reasons[fullitemname] = self.repr_failure_tblong(
 | 
						|
                    event.item, outcome.excinfo, outcome.excinfo.traceback)
 | 
						|
                self.stdout[fullitemname] = outcome.stdout
 | 
						|
                self.stderr[fullitemname] = outcome.stderr
 | 
						|
            elif outcome.signal:
 | 
						|
                self.fail_reasons[fullitemname] = "Received signal %d" % outcome.signal
 | 
						|
                self.stdout[fullitemname] = outcome.stdout
 | 
						|
                self.stderr[fullitemname] = outcome.stderr
 | 
						|
            if event.channel:
 | 
						|
                args['hostkey'] = event.channel.gateway.host.hostid
 | 
						|
            else:
 | 
						|
                args['hostkey'] = ''
 | 
						|
        elif isinstance(event, repevent.ItemStart):
 | 
						|
            args = add_item(event)
 | 
						|
        elif isinstance(event, repevent.TestFinished):
 | 
						|
            args = {}
 | 
						|
            args['run'] = str(self.all)
 | 
						|
            args['fails'] = str(len(self.fail_reasons))
 | 
						|
            args['skips'] = str(len(self.skip_reasons))
 | 
						|
        elif isinstance(event, repevent.SendItem):
 | 
						|
            args = add_item(event)
 | 
						|
            args['hostkey'] = event.channel.gateway.host.hostid
 | 
						|
        elif isinstance(event, repevent.HostRSyncRootReady):
 | 
						|
            self.ready_hosts[event.host] = True
 | 
						|
            args = {'hostname' : event.host.hostname, 'hostkey' : event.host.hostid}
 | 
						|
        elif isinstance(event, repevent.FailedTryiter):
 | 
						|
            args = add_item(event)
 | 
						|
        elif isinstance(event, repevent.SkippedTryiter):
 | 
						|
            args = add_item(event)
 | 
						|
            args['reason'] = str(event.excinfo.value)
 | 
						|
        else:
 | 
						|
            args = {}
 | 
						|
        args['event'] = str(event)
 | 
						|
        args['type'] = event.__class__.__name__
 | 
						|
        return args
 | 
						|
 | 
						|
    def repr_failure_tblong(self, item, excinfo, traceback):
 | 
						|
        lines = []
 | 
						|
        for index, entry in py.builtin.enumerate(traceback):
 | 
						|
            lines.append('----------')
 | 
						|
            lines.append("%s: %s" % (entry.path, entry.lineno))
 | 
						|
            lines += self.repr_source(entry.relline, entry.source)
 | 
						|
        lines.append("%s: %s" % (excinfo.typename, excinfo.value))
 | 
						|
        return "\n".join(lines)
 | 
						|
    
 | 
						|
    def repr_source(self, relline, source):
 | 
						|
        lines = []
 | 
						|
        for num, line in enumerate(source.split("\n")):
 | 
						|
            if num == relline:
 | 
						|
                lines.append(">>>>" + line)
 | 
						|
            else:
 | 
						|
                lines.append("    " + line)
 | 
						|
        return lines
 | 
						|
 | 
						|
    def report_ReceivedItemOutcome(self, event):
 | 
						|
        self.all += 1
 | 
						|
        self.pending_events.put(event)
 | 
						|
    
 | 
						|
    def report_ItemStart(self, event):
 | 
						|
        if isinstance(event.item, py.test.collect.Module):
 | 
						|
            self.pending_events.put(event)
 | 
						|
    
 | 
						|
    def report_unknown(self, event):
 | 
						|
        # XXX: right now, we just pass it for showing
 | 
						|
        self.pending_events.put(event)
 | 
						|
 | 
						|
    def _host_ready(self, event):
 | 
						|
        self.pending_events.put(event)
 | 
						|
 | 
						|
    def report_HostGatewayReady(self, item):
 | 
						|
        self.to_rsync[item.host] = len(item.roots)
 | 
						|
 | 
						|
    def report_HostRSyncRootReady(self, item):
 | 
						|
        self.to_rsync[item.host] -= 1
 | 
						|
        if not self.to_rsync[item.host]:
 | 
						|
            self._host_ready(item)
 | 
						|
 | 
						|
    
 | 
						|
    def report_TestStarted(self, event):
 | 
						|
        # XXX: It overrides out self.hosts
 | 
						|
        self.hosts = {}
 | 
						|
        self.ready_hosts = {}
 | 
						|
        for host in event.hosts:
 | 
						|
            self.hosts[host] = host
 | 
						|
            self.ready_hosts[host] = False
 | 
						|
        self.start_event.set()
 | 
						|
        self.pending_events.put(event)
 | 
						|
    
 | 
						|
    def report(self, what):
 | 
						|
        repfun = getattr(self, "report_" + what.__class__.__name__,
 | 
						|
                         self.report_unknown)
 | 
						|
        try:
 | 
						|
            repfun(what)
 | 
						|
        except (KeyboardInterrupt, SystemExit):
 | 
						|
            raise
 | 
						|
        except:
 | 
						|
            print "Internal reporting problem"
 | 
						|
            excinfo = py.code.ExceptionInfo()
 | 
						|
            for i in excinfo.traceback:
 | 
						|
                print str(i)[2:-1]
 | 
						|
            print excinfo
 | 
						|
 | 
						|
##        try:
 | 
						|
##            self.wait_flag.acquire()
 | 
						|
##            self.pending_events.insert(0, event)
 | 
						|
##            self.wait_flag.notify()
 | 
						|
##        finally:
 | 
						|
##            self.wait_flag.release()
 | 
						|
 | 
						|
exported_methods = ExportedMethods()
 | 
						|
 | 
						|
class TestHandler(BaseHTTPRequestHandler):
 | 
						|
    exported_methods = exported_methods
 | 
						|
    
 | 
						|
    def do_GET(self):
 | 
						|
        path = self.path
 | 
						|
        if path.endswith("/"):
 | 
						|
            path = path[:-1]
 | 
						|
        if path.startswith("/"):
 | 
						|
            path = path[1:]
 | 
						|
        m = re.match('^(.*)\?(.*)$', path)
 | 
						|
        if m:
 | 
						|
            path = m.group(1)
 | 
						|
            getargs = m.group(2)
 | 
						|
        else:
 | 
						|
            getargs = ""
 | 
						|
        name_path = path.replace(".", "_")
 | 
						|
        method_to_call = getattr(self, "run_" + name_path, None)
 | 
						|
        if method_to_call is None:
 | 
						|
            exec_meth = getattr(self.exported_methods, name_path, None)
 | 
						|
            if exec_meth is None:
 | 
						|
                self.send_error(404, "File %s not found" % path)
 | 
						|
            else:
 | 
						|
                try:
 | 
						|
                    self.serve_data('text/json',
 | 
						|
                                json.write(exec_meth(**self.parse_args(getargs))))
 | 
						|
                except socket.error:
 | 
						|
                    # client happily disconnected
 | 
						|
                    exported_methods.failed(**self.parse_args(getargs))
 | 
						|
        else:
 | 
						|
            method_to_call()
 | 
						|
    
 | 
						|
    def parse_args(self, getargs):
 | 
						|
        # parse get argument list
 | 
						|
        if getargs == "":
 | 
						|
            return {}
 | 
						|
        
 | 
						|
        unquote = py.std.urllib.unquote
 | 
						|
        args = {}
 | 
						|
        arg_pairs = getargs.split("&")
 | 
						|
        for arg in arg_pairs:
 | 
						|
            key, value = arg.split("=")
 | 
						|
            args[unquote(key)] = unquote(value)
 | 
						|
        return args
 | 
						|
    
 | 
						|
    def log_message(self, format, *args):
 | 
						|
        # XXX just discard it
 | 
						|
        pass
 | 
						|
    
 | 
						|
    do_POST = do_GET
 | 
						|
    
 | 
						|
    def run_(self):
 | 
						|
        self.run_index()
 | 
						|
    
 | 
						|
    def run_index(self):
 | 
						|
        data = py.path.local(DATADIR).join("index.html").open().read()
 | 
						|
        self.serve_data("text/html", data)
 | 
						|
    
 | 
						|
    def run_jssource(self):
 | 
						|
        js_name = py.path.local(__file__).dirpath("webdata").join("source.js")
 | 
						|
        web_name = py.path.local(__file__).dirpath().join("webjs.py")
 | 
						|
        if IMPORTED_PYPY and web_name.mtime() > js_name.mtime():
 | 
						|
            from py.__.test.rsession import webjs
 | 
						|
 | 
						|
            javascript_source = rpython2javascript(webjs,
 | 
						|
                FUNCTION_LIST, use_pdb=False)
 | 
						|
            open(str(js_name), "w").write(javascript_source)
 | 
						|
            self.serve_data("text/javascript", javascript_source)
 | 
						|
        else:
 | 
						|
            js_source = open(str(js_name), "r").read()
 | 
						|
            self.serve_data("text/javascript", js_source)
 | 
						|
    
 | 
						|
    def serve_data(self, content_type, data):
 | 
						|
        self.send_response(200)
 | 
						|
        self.send_header("Content-type", content_type)
 | 
						|
        self.send_header("Content-length", len(data))
 | 
						|
        self.end_headers()
 | 
						|
        self.wfile.write(data)
 | 
						|
 | 
						|
def start_server(server_address = ('', 8000), handler=TestHandler, start_new=True):
 | 
						|
    httpd = HTTPServer(server_address, handler)
 | 
						|
 | 
						|
    if start_new:
 | 
						|
        thread.start_new_thread(httpd.serve_forever, ())
 | 
						|
        print "Server started, listening on port %d" % (httpd.server_port,)
 | 
						|
        return httpd
 | 
						|
    else:
 | 
						|
        print "Server started, listening on port %d" % (httpd.server_port,)
 | 
						|
        httpd.serve_forever()
 | 
						|
 | 
						|
def kill_server():
 | 
						|
    exported_methods.pending_events.put(None)
 | 
						|
    while not exported_methods.pending_events.empty():
 | 
						|
        time.sleep(.1)
 | 
						|
    exported_methods.end_event.wait()
 | 
						|
 |