[svn r57321] merging the event branch:

* moving in test, misc, code, io directories and
  py/__init__.py
* py/bin/_find.py does not print to stderr anymore
* a few fixes to conftest files in other dirs
some more fixes and adjustments pending

--HG--
branch : trunk
This commit is contained in:
hpk
2008-08-16 17:26:59 +02:00
parent 7428eadf7d
commit abc8cf09aa
187 changed files with 27242 additions and 18 deletions

1
py/misc/__init__.py Normal file
View File

@@ -0,0 +1 @@
#

203
py/misc/_dist.py Normal file
View File

@@ -0,0 +1,203 @@
import py
import sys, os, re
from distutils import sysconfig
from distutils import core
winextensions = 1
if sys.platform == 'win32':
try:
import _winreg, win32gui, win32con
except ImportError:
winextensions = 0
class Params:
""" a crazy hack to convince distutils to please
install all of our files inside the package.
"""
_sitepackages = py.path.local(sysconfig.get_python_lib())
def __init__(self, pkgmod):
name = pkgmod.__name__
self._pkgdir = py.path.local(pkgmod.__file__).dirpath()
self._rootdir = self._pkgdir.dirpath()
self._pkgtarget = self._sitepackages.join(name)
self._datadict = {}
self.packages = []
self.scripts = []
self.hacktree()
self.data_files = self._datadict.items()
self.data_files.sort()
self.packages.sort()
self.scripts.sort()
def hacktree(self):
for p in self._pkgdir.visit(None, lambda x: x.basename != '.svn'):
if p.check(file=1):
if p.ext in ('.pyc', '.pyo'):
continue
if p.dirpath().basename == 'bin':
self.scripts.append(p.relto(self._rootdir))
self.adddatafile(p)
elif p.ext == '.py':
self.addpythonfile(p)
else:
self.adddatafile(p)
#else:
# if not p.listdir():
# self.adddatafile(p.ensure('dummy'))
def adddatafile(self, p):
if p.ext in ('.pyc', 'pyo'):
return
target = self._pkgtarget.join(p.dirpath().relto(self._pkgdir))
l = self._datadict.setdefault(str(target), [])
l.append(p.relto(self._rootdir))
def addpythonfile(self, p):
parts = p.parts()
for above in p.parts(reverse=True)[1:]:
if self._pkgdir.relto(above):
dottedname = p.dirpath().relto(self._rootdir).replace(p.sep, '.')
if dottedname not in self.packages:
self.packages.append(dottedname)
break
if not above.join('__init__.py').check():
self.adddatafile(p)
#print "warning, added data file", p
break
#if sys.platform != 'win32':
# scripts.remove('py/bin/pytest.cmd')
#else:
# scripts.remove('py/bin/py.test')
#
### helpers:
def checknonsvndir(p):
if p.basename != '.svn' and p.check(dir=1):
return True
def dump(params):
print "packages"
for x in params.packages:
print "package ", x
print
print "scripts"
for x in params.scripts:
print "script ", x
print
print "data files"
for x in params.data_files:
print "data file ", x
print
def addbindir2path():
if sys.platform != 'win32' or not winextensions:
return
# Add py/bin to PATH environment variable
bindir = os.path.join(sysconfig.get_python_lib(), "py", "bin", "win32")
# check for the user path
ureg = _winreg.ConnectRegistry(None, _winreg.HKEY_CURRENT_USER)
ukey = r"Environment"
# not every user has his own path on windows
try:
upath = get_registry_value(ureg, ukey, "PATH")
except WindowsError:
upath=""
# if bindir allready in userpath -> do nothing
if bindir in upath:
return
reg = _winreg.ConnectRegistry(None, _winreg.HKEY_LOCAL_MACHINE)
key = r"SYSTEM\CurrentControlSet\Control\Session Manager\Environment"
path = get_registry_value(reg, key, "Path")
# if bindir allready in systempath -> do nothing
if bindir in path:
return
path += ";" + bindir
print "Setting PATH to:", path
pathset=False
try:
set_registry_value(reg, key, "PATH", path)
pathset=True
except WindowsError:
print "cannot set systempath, falling back to userpath"
pass
if not pathset:
try:
if len(upath)>0: #if no user path present
upath += ";"
upath+=bindir
set_registry_value(ureg, ukey, "Path", upath)
pathset=True
except WindowsError:
print "cannot set userpath, please add %s to your path" % (bindir,)
return
#print "Current PATH is:", get_registry_value(reg, key, "Path")
# Propagate changes throughout the system
win32gui.SendMessageTimeout(win32con.HWND_BROADCAST,
win32con.WM_SETTINGCHANGE, 0, "Environment",
win32con.SMTO_ABORTIFHUNG, 5000)
# Propagate changes to current command prompt
os.system("set PATH=%s" % path)
def get_registry_value(reg, key, value_name):
k = _winreg.OpenKey(reg, key)
value = _winreg.QueryValueEx(k, value_name)[0]
_winreg.CloseKey(k)
return value
def set_registry_value(reg, key, value_name, value):
k = _winreg.OpenKey(reg, key, 0, _winreg.KEY_WRITE)
value_type = _winreg.REG_SZ
# if we handle the Path value, then set its type to REG_EXPAND_SZ
# so that things like %SystemRoot% get automatically expanded by the
# command prompt
if value_name == "Path":
value_type = _winreg.REG_EXPAND_SZ
_winreg.SetValueEx(k, value_name, 0, value_type, value)
_winreg.CloseKey(k)
### end helpers
def setup(pkg, **kw):
""" invoke distutils on a given package.
"""
if 'install' in sys.argv[1:]:
print "precompiling greenlet module"
try:
x = py.magic.greenlet()
except (RuntimeError, ImportError):
print "could not precompile greenlet module, skipping"
params = Params(pkg)
#dump(params)
source = getattr(pkg, '__pkg__', pkg)
namelist = list(core.setup_keywords)
namelist.extend(['packages', 'scripts', 'data_files'])
for name in namelist:
for ns in (source, params):
if hasattr(ns, name):
kw[name] = getattr(ns, name)
break
#script_args = sys.argv[1:]
#if 'install' in script_args:
# script_args = ['--quiet'] + script_args
# #print "installing", py
#py.std.pprint.pprint(kw)
core.setup(**kw)
if 'install' in sys.argv[1:]:
addbindir2path()
x = params._rootdir.join('build')
if x.check():
print "removing", x
x.remove()

84
py/misc/buildcmodule.py Normal file
View File

@@ -0,0 +1,84 @@
"""
A utility to build a Python extension module from C, wrapping distutils.
"""
import py
# XXX we should distutils in a subprocess, because it messes up the
# environment and who knows what else. Currently we just save
# and restore os.environ.
def make_module_from_c(cfile):
import os, sys, imp
from distutils.core import setup
from distutils.extension import Extension
debug = 0
#try:
# from distutils.log import set_threshold
# set_threshold(10000)
#except ImportError:
# print "ERROR IMPORTING"
# pass
dirpath = cfile.dirpath()
modname = cfile.purebasename
# find the expected extension of the compiled C module
for ext, mode, filetype in imp.get_suffixes():
if filetype == imp.C_EXTENSION:
break
else:
raise ImportError, "cannot find the file name suffix of C ext modules"
lib = dirpath.join(modname+ext)
# XXX argl! we need better "build"-locations alltogether!
if lib.check():
try:
lib.remove()
except EnvironmentError:
pass # XXX we just use the existing version, bah
if not lib.check():
c = py.io.StdCaptureFD()
try:
try:
saved_environ = os.environ.items()
try:
lastdir = dirpath.chdir()
try:
setup(
name = "pylibmodules",
ext_modules=[
Extension(modname, [str(cfile)])
],
script_name = 'setup.py',
script_args = ['-q', 'build_ext', '--inplace']
#script_args = ['build_ext', '--inplace']
)
finally:
lastdir.chdir()
finally:
for key, value in saved_environ:
if os.environ.get(key) != value:
os.environ[key] = value
finally:
foutput, foutput = c.done()
except KeyboardInterrupt:
raise
except SystemExit, e:
raise RuntimeError("cannot compile %s: %s\n%s" % (cfile, e,
foutput.read()))
# XXX do we need to do some check on fout/ferr?
# XXX not a nice way to import a module
if debug:
print "inserting path to sys.path", dirpath
sys.path.insert(0, str(dirpath))
if debug:
print "import %(modname)s as testmodule" % locals()
exec py.code.compile("import %(modname)s as testmodule" % locals())
try:
sys.path.remove(str(dirpath))
except ValueError:
pass
return testmodule

157
py/misc/cache.py Normal file
View File

@@ -0,0 +1,157 @@
"""
This module contains multithread-safe cache implementations.
Caches mainly have a
__getitem__ and getorbuild() method
where the latter either just return a cached value or
first builds the value.
These are the current cache implementations:
BuildcostAccessCache tracks building-time and accesses. Evicts
by product of num-accesses * build-time.
"""
import py
gettime = py.std.time.time
class WeightedCountingEntry(object):
def __init__(self, value, oneweight):
self.num = 1
self._value = value
self.oneweight = oneweight
def weight():
def fget(self):
return (self.num * self.oneweight, self.num)
return property(fget, None, None, "cumulative weight")
weight = weight()
def value():
def fget(self):
# you need to protect against mt-access at caller side!
self.num += 1
return self._value
return property(fget, None, None)
value = value()
def __repr__(self):
return "<%s weight=%s>" % (self.__class__.__name__, self.weight)
class BasicCache(object):
def __init__(self, maxentries=128):
self.maxentries = maxentries
self.prunenum = int(maxentries - maxentries/8)
self._lock = py.std.threading.RLock()
self._dict = {}
def getentry(self, key):
lock = self._lock
lock.acquire()
try:
return self._dict.get(key, None)
finally:
lock.release()
def putentry(self, key, entry):
self._lock.acquire()
try:
self._prunelowestweight()
self._dict[key] = entry
finally:
self._lock.release()
def delentry(self, key, raising=False):
self._lock.acquire()
try:
try:
del self._dict[key]
except KeyError:
if raising:
raise
finally:
self._lock.release()
def getorbuild(self, key, builder, *args, **kwargs):
entry = self.getentry(key)
if entry is None:
entry = self.build(key, builder, *args, **kwargs)
return entry.value
def _prunelowestweight(self):
""" prune out entries with lowest weight. """
# note: must be called with acquired self._lock!
numentries = len(self._dict)
if numentries >= self.maxentries:
# evict according to entry's weight
items = [(entry.weight, key) for key, entry in self._dict.iteritems()]
items.sort()
index = numentries - self.prunenum
if index > 0:
for weight, key in items[:index]:
del self._dict[key]
class BuildcostAccessCache(BasicCache):
""" A BuildTime/Access-counting cache implementation.
the weight of a value is computed as the product of
num-accesses-of-a-value * time-to-build-the-value
The values with the least such weights are evicted
if the cache maxentries threshold is superceded.
For implementation flexibility more than one object
might be evicted at a time.
"""
# time function to use for measuring build-times
_time = gettime
def __init__(self, maxentries=64):
super(BuildcostAccessCache, self).__init__(maxentries)
def build(self, key, builder, *args, **kwargs):
start = self._time()
val = builder(*args, **kwargs)
end = self._time()
entry = WeightedCountingEntry(val, end-start)
self.putentry(key, entry)
return entry
class AgingCache(BasicCache):
""" This cache prunes out cache entries that are too old.
"""
def __init__(self, maxentries=128, maxseconds=10.0):
super(AgingCache, self).__init__(maxentries)
self.maxseconds = maxseconds
def getentry(self, key):
self._lock.acquire()
try:
try:
entry = self._dict[key]
except KeyError:
entry = None
else:
if entry.isexpired():
del self._dict[key]
entry = None
return entry
finally:
self._lock.release()
def build(self, key, builder, *args, **kwargs):
ctime = gettime()
val = builder(*args, **kwargs)
entry = AgingEntry(val, ctime + self.maxseconds)
self.putentry(key, entry)
return entry
class AgingEntry(object):
def __init__(self, value, expirationtime):
self.value = value
self.weight = expirationtime
def isexpired(self):
t = py.std.time.time()
return t >= self.weight

View File

@@ -0,0 +1 @@
#

84
py/misc/cmdline/countloc.py Executable file
View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python
# hands on script to compute the non-empty Lines of Code
# for tests and non-test code
import py
curdir = py.path.local()
def nodot(p):
return p.check(dotfile=0)
class FileCounter(object):
def __init__(self):
self.file2numlines = {}
self.numlines = 0
self.numfiles = 0
def addrecursive(self, directory, fil="*.py", rec=nodot):
for x in directory.visit(fil, rec):
self.addfile(x)
def addfile(self, fn, emptylines=False):
if emptylines:
s = len(p.readlines())
else:
s = 0
for i in fn.readlines():
if i.strip():
s += 1
self.file2numlines[fn] = s
self.numfiles += 1
self.numlines += s
def getnumlines(self, fil):
numlines = 0
for path, value in self.file2numlines.items():
if fil(path):
numlines += value
return numlines
def getnumfiles(self, fil):
numfiles = 0
for path in self.file2numlines:
if fil(path):
numfiles += 1
return numfiles
def get_loccount(locations=None):
if locations is None:
localtions = [py.path.local()]
counter = FileCounter()
for loc in locations:
counter.addrecursive(loc, '*.py', rec=nodot)
def istestfile(p):
return p.check(fnmatch='test_*.py')
isnottestfile = lambda x: not istestfile(x)
numfiles = counter.getnumfiles(isnottestfile)
numlines = counter.getnumlines(isnottestfile)
numtestfiles = counter.getnumfiles(istestfile)
numtestlines = counter.getnumlines(istestfile)
return counter, numfiles, numlines, numtestfiles, numtestlines
def countloc(paths=None):
if not paths:
paths = ['.']
locations = [py.path.local(x) for x in paths]
(counter, numfiles, numlines, numtestfiles,
numtestlines) = get_loccount(locations)
items = counter.file2numlines.items()
items.sort(lambda x,y: cmp(x[1], y[1]))
for x, y in items:
print "%3d %30s" % (y,x)
print "%30s %3d" %("number of testfiles", numtestfiles)
print "%30s %3d" %("number of non-empty testlines", numtestlines)
print "%30s %3d" %("number of files", numfiles)
print "%30s %3d" %("number of non-empty lines", numlines)

View File

@@ -0,0 +1,63 @@
"""
Put this file as 'conftest.py' somewhere upwards from py-trunk,
modify the "socketserveradr" below to point to a windows/linux
host running "py/execnet/script/loop_socketserver.py"
and invoke e.g. from linux:
py.test --session=MySession some_path_to_what_you_want_to_test
This should ad-hoc distribute the running of tests to
the remote machine (including rsyncing your WC).
"""
import py
from py.__.test.looponfail.remote import LooponfailingSession
import os
class MyRSync(py.execnet.RSync):
def filter(self, path):
if path.endswith('.pyc') or path.endswith('~'):
return False
dir, base = os.path.split(path)
# we may want to have revision info on the other side,
# so let's not exclude .svn directories
#if base == '.svn':
# return False
return True
class MySession(LooponfailingSession):
socketserveradr = ('10.9.2.62', 8888)
socketserveradr = ('10.9.4.148', 8888)
def _initslavegateway(self):
print "MASTER: initializing remote socket gateway"
gw = py.execnet.SocketGateway(*self.socketserveradr)
pkgname = 'py' # xxx flexibilize
channel = gw.remote_exec("""
import os
topdir = os.path.join(os.environ['HOMEPATH'], 'pytestcache')
pkgdir = os.path.join(topdir, %r)
channel.send((topdir, pkgdir))
""" % (pkgname,))
remotetopdir, remotepkgdir = channel.receive()
sendpath = py.path.local(py.__file__).dirpath()
rsync = MyRSync(sendpath)
rsync.add_target(gw, remotepkgdir, delete=True)
rsync.send()
channel = gw.remote_exec("""
import os, sys
path = %r # os.path.abspath
sys.path.insert(0, path)
os.chdir(path)
import py
channel.send((path, py.__file__))
""" % remotetopdir)
topdir, remotepypath = channel.receive()
assert topdir == remotetopdir, (topdir, remotetopdir)
assert remotepypath.startswith(topdir), (remotepypath, topdir)
#print "remote side has rsynced pythonpath ready: %r" %(topdir,)
return gw, topdir
dist_hosts = ['localhost', 'cobra', 'cobra']

32
py/misc/difftime.py Normal file
View File

@@ -0,0 +1,32 @@
import py
_time_desc = {
1 : 'second', 60 : 'minute', 3600 : 'hour', 86400 : 'day',
2628000 : 'month', 31536000 : 'year', }
def worded_diff_time(ctime):
difftime = py.std.time.time() - ctime
keys = _time_desc.keys()
keys.sort()
for i, key in py.builtin.enumerate(keys):
if key >=difftime:
break
l = []
keylist = keys[:i]
keylist.reverse()
for key in keylist[:1]:
div = int(difftime / key)
if div==0:
break
difftime -= div * key
plural = div > 1 and 's' or ''
l.append('%d %s%s' %(div, _time_desc[key], plural))
return ", ".join(l) + " ago "
_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def worded_time(ctime):
tm = py.std.time.gmtime(ctime)
return "%s %d, %d" % (_months[tm.tm_mon-1], tm.tm_mday, tm.tm_year)

84
py/misc/dynpkg.py Normal file
View File

@@ -0,0 +1,84 @@
"""
"""
import py
import sys
log = py.log.get("dynpkg",
info=py.log.STDOUT,
debug=py.log.STDOUT,
command=None) # py.log.STDOUT)
from distutils import util
class DistPython:
def __init__(self, location=None, python=None):
if python is None:
python = py.std.sys.executable
self.python = python
if location is None:
location = py.path.local()
self.location = location
self.plat_specifier = '.%s-%s' % (util.get_platform(), sys.version[0:3])
def clean(self):
out = self._exec("clean -a")
#print out
def build(self):
out = self._exec("build")
#print out
def _exec(self, cmd):
python = self.python
old = self.location.chdir()
try:
cmd = "%(python)s setup.py %(cmd)s" % locals()
log.command(cmd)
out = py.process.cmdexec(cmd)
finally:
old.chdir()
return out
def get_package_path(self, pkgname):
pkg = self._get_package_path(pkgname)
if pkg is None:
#self.clean()
self.build()
pkg = self._get_package_path(pkgname)
assert pkg is not None
return pkg
def _get_package_path(self, pkgname):
major, minor = py.std.sys.version_info[:2]
#assert major >=2 and minor in (3,4,5)
suffix = "%s.%s" %(major, minor)
location = self.location
for base in [location.join('build', 'lib'),
location.join('build', 'lib'+ self.plat_specifier)]:
if base.check(dir=1):
for pkg in base.visit(lambda x: x.check(dir=1)):
if pkg.basename == pkgname:
#
if pkg.dirpath().basename == 'lib'+ self.plat_specifier or \
pkg.dirpath().basename == 'lib':
return pkg
def setpkg(finalpkgname, distdir):
assert distdir.check(dir=1)
dist = DistPython(distdir)
pkg = dist.get_package_path(finalpkgname)
assert pkg.check(dir=1)
sys.path.insert(0, str(pkg.dirpath()))
try:
modname = pkg.purebasename
if modname in sys.modules:
log.debug("removing from sys.modules:", modname)
del sys.modules[modname]
sys.modules[modname] = mod = __import__(modname)
finally:
sys.path[0] # XXX
log.info("module is at", mod.__file__)
return mod

79
py/misc/error.py Normal file
View File

@@ -0,0 +1,79 @@
import py
import errno
class Error(EnvironmentError):
__module__ = 'py.error'
def __repr__(self):
return "%s.%s %r: %s " %(self.__class__.__module__,
self.__class__.__name__,
self.__class__.__doc__,
" ".join(map(str, self.args)),
#repr(self.args)
)
def __str__(self):
return "[%s]: %s" %(self.__class__.__doc__,
" ".join(map(str, self.args)),
)
_winerrnomap = {
2: errno.ENOENT,
3: errno.ENOENT,
17: errno.EEXIST,
22: errno.ENOTDIR,
267: errno.ENOTDIR,
5: errno.EACCES, # anything better?
}
# note: 'py.std' may not be imported yet at all, because
# the 'error' module in this file is imported very early.
# This is dependent on dict order.
ModuleType = type(py)
class py_error(ModuleType):
""" py.error lazily provides higher level Exception classes
for each possible POSIX errno (as defined per
the 'errno' module. All such Exceptions derive
from py.error.Error, which itself is a subclass
of EnvironmentError.
"""
Error = Error
def _getwinerrnoclass(cls, eno):
return cls._geterrnoclass(_winerrnomap[eno])
_getwinerrnoclass = classmethod(_getwinerrnoclass)
def _geterrnoclass(eno, _errno2class = {}):
try:
return _errno2class[eno]
except KeyError:
clsname = py.std.errno.errorcode.get(eno, "UnknownErrno%d" %(eno,))
cls = type(Error)(clsname, (Error,),
{'__module__':'py.error',
'__doc__': py.std.os.strerror(eno)})
_errno2class[eno] = cls
return cls
_geterrnoclass = staticmethod(_geterrnoclass)
def __getattr__(self, name):
eno = getattr(py.std.errno, name)
cls = self._geterrnoclass(eno)
setattr(self, name, cls)
return cls
def getdict(self, done=[]):
try:
return done[0]
except IndexError:
for name in py.std.errno.errorcode.values():
hasattr(self, name) # force attribute to be loaded, ignore errors
dictdescr = ModuleType.__dict__['__dict__']
done.append(dictdescr.__get__(self))
return done[0]
__dict__ = property(getdict)
del getdict
error = py_error('py.error', py_error.__doc__)

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python
import py
import inspect
import types
def report_strange_docstring(name, obj):
if obj.__doc__ is None:
print "%s misses a docstring" % (name, )
elif obj.__doc__ == "":
print "%s has an empty" % (name, )
elif "XXX" in obj.__doc__:
print "%s has an 'XXX' in its docstring" % (name, )
def find_code(method):
return getattr(getattr(method, "im_func", None), "func_code", None)
def report_different_parameter_names(name, cls):
bases = cls.__mro__
for base in bases:
for attr in dir(base):
meth1 = getattr(base, attr)
code1 = find_code(meth1)
if code1 is None:
continue
if not callable(meth1):
continue
if not hasattr(cls, attr):
continue
meth2 = getattr(cls, attr)
code2 = find_code(meth2)
if not callable(meth2):
continue
if code2 is None:
continue
args1 = inspect.getargs(code1)[0]
args2 = inspect.getargs(code2)[0]
for a1, a2 in zip(args1, args2):
if a1 != a2:
print "%s.%s have different argument names %s, %s than the version in %s" % (name, attr, a1, a2, base)
def find_all_exported():
stack = [(name, getattr(py, name)) for name in dir(py)[::-1]
if not name.startswith("_") and name != "compat"]
seen = {}
exported = []
while stack:
name, obj = stack.pop()
if id(obj) in seen:
continue
else:
seen[id(obj)] = True
exported.append((name, obj))
if isinstance(obj, type) or isinstance(obj, type(py)):
stack.extend([("%s.%s" % (name, s), getattr(obj, s)) for s in dir(obj)
if len(s) <= 1 or not (s[0] == '_' and s[1] != '_')])
return exported
if __name__ == '__main__':
all_exported = find_all_exported()
print "strange docstrings"
print "=================="
print
for name, obj in all_exported:
if callable(obj):
report_strange_docstring(name, obj)
print "\n\ndifferent parameters"
print "===================="
print
for name, obj in all_exported:
if isinstance(obj, type):
report_different_parameter_names(name, obj)

10
py/misc/killproc.py Normal file
View File

@@ -0,0 +1,10 @@
import py
import os, sys
def killproc(pid):
if sys.platform == "win32":
py.process.cmdexec("taskkill /F /PID %d" %(pid,))
else:
os.kill(pid, 15)

73
py/misc/rest.py Normal file
View File

@@ -0,0 +1,73 @@
import py
import sys, os, traceback
import re
if hasattr(sys.stdout, 'fileno') and os.isatty(sys.stdout.fileno()):
def log(msg):
print msg
else:
def log(msg):
pass
def convert_rest_html(source, source_path, stylesheet=None, encoding='latin1'):
from py.__.rest import directive
""" return html latin1-encoded document for the given input.
source a ReST-string
sourcepath where to look for includes (basically)
stylesheet path (to be used if any)
"""
from docutils.core import publish_string
directive.set_backend_and_register_directives("html")
kwargs = {
'stylesheet' : stylesheet,
'stylesheet_path': None,
'traceback' : 1,
'embed_stylesheet': 0,
'output_encoding' : encoding,
#'halt' : 0, # 'info',
'halt_level' : 2,
}
# docutils uses os.getcwd() :-(
source_path = os.path.abspath(str(source_path))
prevdir = os.getcwd()
try:
os.chdir(os.path.dirname(source_path))
return publish_string(source, source_path, writer_name='html',
settings_overrides=kwargs)
finally:
os.chdir(prevdir)
def process(txtpath, encoding='latin1'):
""" process a textfile """
log("processing %s" % txtpath)
assert txtpath.check(ext='.txt')
if isinstance(txtpath, py.path.svnwc):
txtpath = txtpath.localpath
htmlpath = txtpath.new(ext='.html')
#svninfopath = txtpath.localpath.new(ext='.svninfo')
style = txtpath.dirpath('style.css')
if style.check():
stylesheet = style.basename
else:
stylesheet = None
content = unicode(txtpath.read(), encoding)
doc = convert_rest_html(content, txtpath, stylesheet=stylesheet, encoding=encoding)
htmlpath.write(doc)
#log("wrote %r" % htmlpath)
#if txtpath.check(svnwc=1, versioned=1):
# info = txtpath.info()
# svninfopath.dump(info)
rex1 = re.compile(ur'.*<body>(.*)</body>.*', re.MULTILINE | re.DOTALL)
rex2 = re.compile(ur'.*<div class="document">(.*)</div>.*', re.MULTILINE | re.DOTALL)
def strip_html_header(string, encoding='utf8'):
""" return the content of the body-tag """
uni = unicode(string, encoding)
for rex in rex1,rex2:
match = rex.search(uni)
if not match:
break
uni = match.group(1)
return uni

19
py/misc/std.py Normal file
View File

@@ -0,0 +1,19 @@
import sys
class Std(object):
""" makes all standard python modules available as a lazily
computed attribute.
"""
def __init__(self):
self.__dict__ = sys.modules
def __getattr__(self, name):
try:
m = __import__(name)
except ImportError:
raise AttributeError("py.std: could not import %s" % name)
return m
std = Std()

34
py/misc/svnlook.py Normal file
View File

@@ -0,0 +1,34 @@
import py
class ChangeItem:
def __init__(self, repo, revision, line):
self.repo = py.path.local(repo)
self.revision = int(revision)
self.action = action = line[:4]
self.path = line[4:].strip()
self.added = action[0] == "A"
self.modified = action[0] == "M"
self.propchanged = action[1] == "U"
self.deleted = action[0] == "D"
def svnurl(self):
return py.path.svnurl("file://%s/%s" %(self.repo, self.path), self.revision)
def __repr__(self):
return "<ChangeItem %r>" %(self.action + self.path)
def changed(repo, revision):
out = py.process.cmdexec("svnlook changed -r %s %s" %(revision, repo))
l = []
for line in out.strip().split('\n'):
l.append(ChangeItem(repo, revision, line))
return l
def author(repo, revision):
out = py.process.cmdexec("svnlook author -r %s %s" %(revision, repo))
return out.strip()
def youngest(repo):
out = py.process.cmdexec("svnlook youngest %s" %(repo,))
return int(out)

132
py/misc/terminal_helper.py Normal file
View File

@@ -0,0 +1,132 @@
"""
Helper functions for writing to terminals and files.
"""
import sys, os
import py
def get_terminal_width():
try:
import termios,fcntl,struct
call = fcntl.ioctl(0,termios.TIOCGWINSZ,"\000"*8)
height,width = struct.unpack( "hhhh", call ) [:2]
terminal_width = width
except (SystemExit, KeyboardInterrupt), e:
raise
except:
# FALLBACK
terminal_width = int(os.environ.get('COLUMNS', 80))-1
return terminal_width
terminal_width = get_terminal_width()
def ansi_print(text, esc, file=None, newline=True, flush=False):
if file is None:
file = sys.stderr
text = text.rstrip()
if esc and sys.platform != "win32" and file.isatty():
if not isinstance(esc, tuple):
esc = (esc,)
text = (''.join(['\x1b[%sm' % cod for cod in esc]) +
text +
'\x1b[0m') # ANSI color code "reset"
if newline:
text += '\n'
file.write(text)
if flush:
file.flush()
class Out(object):
tty = False
def __init__(self, file):
self.file = py.io.dupfile(file)
self.fullwidth = get_terminal_width()
def sep(self, sepchar, title=None, fullwidth=None):
if not fullwidth:
fullwidth = self.fullwidth
# the goal is to have the line be as long as possible
# under the condition that len(line) <= fullwidth
if title is not None:
# we want 2 + 2*len(fill) + len(title) <= fullwidth
# i.e. 2 + 2*len(sepchar)*N + len(title) <= fullwidth
# 2*len(sepchar)*N <= fullwidth - len(title) - 2
# N <= (fullwidth - len(title) - 2) // (2*len(sepchar))
N = (fullwidth - len(title) - 2) // (2*len(sepchar))
fill = sepchar * N
line = "%s %s %s" % (fill, title, fill)
else:
# we want len(sepchar)*N <= fullwidth
# i.e. N <= fullwidth // len(sepchar)
line = sepchar * (fullwidth // len(sepchar))
# in some situations there is room for an extra sepchar at the right,
# in particular if we consider that with a sepchar like "_ " the
# trailing space is not important at the end of the line
if len(line) + len(sepchar.rstrip()) <= fullwidth:
line += sepchar.rstrip()
self.line(line)
class TerminalOut(Out):
tty = True
def __init__(self, file):
super(TerminalOut, self).__init__(file)
def sep(self, sepchar, title=None):
super(TerminalOut, self).sep(sepchar, title,
self.terminal_width)
def write(self, s):
self.file.write(str(s))
self.file.flush()
def line(self, s=''):
if s:
self.file.write(s + '\n')
else:
self.file.write('\n')
self.file.flush()
def xxxrewrite(self, s=''):
#self.write('\x1b[u%s' % s) - this escape sequence does
# strange things, or nothing at all, on various terminals.
# XXX what is it supposed to do in the first place??
self.write(s)
class FileOut(Out):
def write(self, s):
self.file.write(str(s))
self.file.flush()
def line(self, s=''):
if s:
self.file.write(str(s) + '\n')
else:
self.file.write('\n')
self.file.flush()
def xxxrewrite(self, s=''):
self.write(s)
def getout(file):
# XXX investigate further into terminal output, this is not enough
#
if file is None:
file = py.std.sys.stdout
elif hasattr(file, 'send'):
file = WriteFile(file.send)
elif callable(file):
file = WriteFile(file)
if hasattr(file, 'isatty') and file.isatty():
return TerminalOut(file)
else:
return FileOut(file)
class WriteFile(object):
def __init__(self, writemethod):
self.write = writemethod
def flush(self):
return

View File

@@ -0,0 +1 @@
#

View File

@@ -0,0 +1,160 @@
SVN-fs-dump-format-version: 2
UUID: 9cb23565-b10c-0410-b2e2-dde77f08022e
Revision-number: 0
Prop-content-length: 56
Content-length: 56
K 8
svn:date
V 27
2006-02-13T18:39:13.605561Z
PROPS-END
Revision-number: 1
Prop-content-length: 111
Content-length: 111
K 7
svn:log
V 13
A testdir
K 10
svn:author
V 3
hpk
K 8
svn:date
V 27
2006-02-13T18:39:27.723346Z
PROPS-END
Node-path: testdir
Node-kind: dir
Node-action: add
Prop-content-length: 10
Content-length: 10
PROPS-END
Revision-number: 2
Prop-content-length: 111
Content-length: 111
K 7
svn:log
V 13
_M testdir
K 10
svn:author
V 3
hpk
K 8
svn:date
V 27
2006-02-13T18:39:48.595729Z
PROPS-END
Node-path: testdir
Node-kind: dir
Node-action: change
Prop-content-length: 28
Content-length: 28
K 4
key1
V 4
val2
PROPS-END
Revision-number: 3
Prop-content-length: 113
Content-length: 113
K 7
svn:log
V 15
AM testdir2
K 10
svn:author
V 3
hpk
K 8
svn:date
V 27
2006-02-13T18:40:53.307540Z
PROPS-END
Node-path: testdir2
Node-kind: dir
Node-action: add
Prop-content-length: 28
Content-length: 28
K 4
key2
V 4
val2
PROPS-END
Revision-number: 4
Prop-content-length: 113
Content-length: 113
K 7
svn:log
V 15
D testdir2
K 10
svn:author
V 3
hpk
K 8
svn:date
V 27
2006-02-13T18:41:07.188024Z
PROPS-END
Node-path: testdir2
Node-action: delete
Revision-number: 5
Prop-content-length: 112
Content-length: 112
K 7
svn:log
V 14
_M testdir
K 10
svn:author
V 3
hpk
K 8
svn:date
V 27
2006-02-13T18:42:03.179177Z
PROPS-END
Node-path: testdir
Node-kind: dir
Node-action: change
Prop-content-length: 10
Content-length: 10
PROPS-END

View File

@@ -0,0 +1,44 @@
from py.test import raises
import py
import sys
import inspect
class TestAPI_V0_namespace_consistence:
def test_path_entrypoints(self):
assert inspect.ismodule(py.path)
assert_class('py.path', 'local')
assert_class('py.path', 'svnwc')
assert_class('py.path', 'svnurl')
def test_magic_entrypoints(self):
assert_function('py.magic', 'invoke')
assert_function('py.magic', 'revoke')
assert_function('py.magic', 'patch')
assert_function('py.magic', 'revoke')
def test_process_entrypoints(self):
assert_function('py.process', 'cmdexec')
def XXXtest_utest_entrypoints(self):
# XXX TOBECOMPLETED
assert_function('py.test', 'main')
#assert_module('std.utest', 'collect')
def assert_class(modpath, name):
mod = __import__(modpath, None, None, [name])
obj = getattr(mod, name)
assert inspect.isclass(obj)
# we don't test anymore that the exported classes have
# the exported module path and name on them.
#fullpath = modpath + '.' + name
#assert obj.__module__ == modpath
#if sys.version_info >= (2,3):
# assert obj.__name__ == name
def assert_function(modpath, name):
mod = __import__(modpath, None, None, [name])
obj = getattr(mod, name)
assert hasattr(obj, 'func_doc')
#assert obj.func_name == name

View File

@@ -0,0 +1,68 @@
import py
from py.__.misc.cache import BuildcostAccessCache, AgingCache
class BasicCacheAPITest:
cache = None
def test_getorbuild(self):
val = self.cache.getorbuild(-42, lambda: 42)
assert val == 42
val = self.cache.getorbuild(-42, lambda: 23)
assert val == 42
def test_cache_get_key_error(self):
assert self.cache.getentry(-23) == None
def test_delentry_non_raising(self):
val = self.cache.getorbuild(100, lambda: 100)
self.cache.delentry(100)
assert self.cache.getentry(100) is None
def test_delentry_raising(self):
val = self.cache.getorbuild(100, lambda: 100)
self.cache.delentry(100)
py.test.raises(KeyError, "self.cache.delentry(100, raising=True)")
class TestBuildcostAccess(BasicCacheAPITest):
cache = BuildcostAccessCache(maxentries=128)
def test_cache_works_somewhat_simple(self):
cache = BuildcostAccessCache()
for x in range(cache.maxentries):
y = cache.getorbuild(x, lambda: x)
assert x == y
for x in range(cache.maxentries):
assert cache.getorbuild(x, None) == x
for x in range(cache.maxentries/2):
assert cache.getorbuild(x, None) == x
assert cache.getorbuild(x, None) == x
assert cache.getorbuild(x, None) == x
val = cache.getorbuild(cache.maxentries * 2, lambda: 42)
assert val == 42
# check that recently used ones are still there
# and are not build again
for x in range(cache.maxentries/2):
assert cache.getorbuild(x, None) == x
assert cache.getorbuild(cache.maxentries*2, None) == 42
class TestAging(BasicCacheAPITest):
maxsecs = 0.02
cache = AgingCache(maxentries=128, maxseconds=maxsecs)
def test_cache_eviction(self):
self.cache.getorbuild(17, lambda: 17)
endtime = py.std.time.time() + self.maxsecs * 10
while py.std.time.time() < endtime:
if self.cache.getentry(17) is None:
break
py.std.time.sleep(self.maxsecs*0.3)
else:
py.test.fail("waiting for cache eviction failed")
def test_prune_lowestweight():
maxsecs = 0.05
cache = AgingCache(maxentries=10, maxseconds=maxsecs)
for x in range(cache.maxentries):
cache.getorbuild(x, lambda: x)
py.std.time.sleep(maxsecs*1.1)
cache.getorbuild(cache.maxentries+1, lambda: 42)

View File

@@ -0,0 +1,20 @@
import py
import errno
def test_error_classes():
for name in errno.errorcode.values():
x = getattr(py.error, name)
assert issubclass(x, py.error.Error)
assert issubclass(x, EnvironmentError)
def test_unknown_error():
num = 3999
cls = py.error._geterrnoclass(num)
assert cls.__name__ == 'UnknownErrno%d' % (num,)
assert issubclass(cls, py.error.Error)
assert issubclass(cls, EnvironmentError)
cls2 = py.error._geterrnoclass(num)
assert cls is cls2

View File

@@ -0,0 +1,267 @@
from __future__ import generators
import py
import types
import sys
def checksubpackage(name):
obj = getattr(py, name)
if hasattr(obj, '__map__'): # isinstance(obj, Module):
keys = dir(obj)
assert len(keys) > 0
assert getattr(obj, '__map__') == {}
def test_dir():
from py.__.initpkg import Module
for name in dir(py):
if name == 'magic': # greenlets don't work everywhere, we don't care here
continue
if not name.startswith('_'):
yield checksubpackage, name
from py.initpkg import Module
glob = []
class MyModule(Module):
def __init__(self, *args):
glob.append(self.__dict__)
assert isinstance(glob[-1], (dict, type(None)))
Module.__init__(self, *args)
def test_early__dict__access():
mymod = MyModule("whatever", "myname")
assert isinstance(mymod.__dict__, dict)
def test_resolve_attrerror():
extpyish = "./initpkg.py", "hello"
excinfo = py.test.raises(AttributeError, "py.__pkg__._resolve(extpyish)")
s = str(excinfo.value)
assert s.find(extpyish[0]) != -1
assert s.find(extpyish[1]) != -1
def test_virtual_module_identity():
from py import path as path1
from py import path as path2
assert path1 is path2
from py.path import local as local1
from py.path import local as local2
assert local1 is local2
def test_importall():
base = py.path.local(py.__file__).dirpath()
nodirs = (
base.join('test', 'testing', 'data'),
base.join('apigen', 'tracer', 'testing', 'package'),
base.join('test', 'testing', 'test'),
base.join('test', 'rsession', 'webjs.py'),
base.join('apigen', 'source', 'server.py'),
base.join('magic', 'greenlet.py'),
base.join('path', 'gateway',),
base.join('doc',),
base.join('rest', 'directive.py'),
base.join('test', 'testing', 'import_test'),
base.join('c-extension',),
base.join('test', 'report', 'web.py'),
base.join('test', 'report', 'webjs.py'),
base.join('test', 'report', 'rest.py'),
base.join('magic', 'greenlet.py'),
base.join('bin'),
base.join('execnet', 'script'),
base.join('compat', 'testing'),
)
for p in base.visit('*.py', lambda x: x.check(dotfile=0)):
if p.basename == '__init__.py':
continue
relpath = p.new(ext='').relto(base)
if base.sep in relpath: # not py/*.py itself
for x in nodirs:
if p == x or p.relto(x):
break
else:
relpath = relpath.replace(base.sep, '.')
modpath = 'py.__.%s' % relpath
yield check_import, modpath
def check_import(modpath):
print "checking import", modpath
assert __import__(modpath)
def test_shahexdigest():
hex = py.__pkg__.shahexdigest()
assert len(hex) == 40
def test_getzipdata():
s = py.__pkg__.getzipdata()
def test_getrev():
if not py.path.local(py.__file__).dirpath('.svn').check():
py.test.skip("py package is not a svn checkout")
d = py.__pkg__.getrev()
svnversion = py.path.local.sysfind('svnversion')
if svnversion is None:
py.test.skip("cannot test svnversion, 'svnversion' binary not found")
v = svnversion.sysexec(py.path.local(py.__file__).dirpath())
assert v.startswith(str(d))
# the following test should abasically work in the future
def XXXtest_virtual_on_the_fly():
py.initpkg('my', {
'x.abspath' : 'os.path.abspath',
'x.local' : 'py.path.local',
'y' : 'smtplib',
'z.cmdexec' : 'py.process.cmdexec',
})
from my.x import abspath
from my.x import local
import smtplib
from my import y
assert y is smtplib
from my.z import cmdexec
from py.process import cmdexec as cmdexec2
assert cmdexec is cmdexec2
#
# test support for importing modules
#
class TestRealModule:
def setup_class(cls):
cls.tmpdir = py.test.ensuretemp('test_initpkg')
sys.path = [str(cls.tmpdir)] + sys.path
pkgdir = cls.tmpdir.ensure('realtest', dir=1)
tfile = pkgdir.join('__init__.py')
tfile.write(py.code.Source("""
import py
py.initpkg('realtest', {
'x.module.__doc__': ('./testmodule.py', '__doc__'),
'x.module': ('./testmodule.py', '*'),
})
"""))
tfile = pkgdir.join('testmodule.py')
tfile.write(py.code.Source("""
'test module'
__all__ = ['mytest0', 'mytest1', 'MyTest']
def mytest0():
pass
def mytest1():
pass
class MyTest:
pass
"""))
import realtest # need to mimic what a user would do
#py.initpkg('realtest', {
# 'module': ('./testmodule.py', None)
#})
def setup_method(self, *args):
"""Unload the test modules before each test."""
module_names = ['realtest', 'realtest.x', 'realtest.x.module']
for modname in module_names:
if modname in sys.modules:
del sys.modules[modname]
def test_realmodule(self):
"""Testing 'import realtest.x.module'"""
import realtest.x.module
assert 'realtest.x.module' in sys.modules
assert getattr(realtest.x.module, 'mytest0')
def test_realmodule_from(self):
"""Testing 'from test import module'."""
from realtest.x import module
assert getattr(module, 'mytest1')
def test_realmodule_star(self):
"""Testing 'from test.module import *'."""
tfile = self.tmpdir.join('startest.py')
tfile.write(py.code.Source("""
from realtest.x.module import *
globals()['mytest0']
globals()['mytest1']
globals()['MyTest']
"""))
import startest # an exception will be raise if an error occurs
def test_realmodule_dict_import(self):
"Test verifying that accessing the __dict__ invokes the import"
import realtest.x.module
moddict = realtest.x.module.__dict__
assert 'mytest0' in moddict
assert 'mytest1' in moddict
assert 'MyTest' in moddict
def test_realmodule___doc__(self):
"""test whether the __doc__ attribute is set properly from initpkg"""
import realtest.x.module
assert realtest.x.module.__doc__ == 'test module'
#class TestStdHook:
# """Tests imports for the standard Python library hook."""
#
# def setup_method(self, *args):
# """Unload the test modules before each test."""
# module_names = ['py.std.StringIO', 'py.std', 'py']
# for modname in module_names:
# if modname in sys.modules:
# del sys.modules[modname]
#
# def test_std_import_simple(self):
# import py
# StringIO = py.std.StringIO
# assert 'py' in sys.modules
# assert 'py.std' in sys.modules
# assert 'py.std.StringIO' in sys.modules
# assert hasattr(py.std.StringIO, 'StringIO')
#
# def test_std_import0(self):
# """Testing 'import py.std.StringIO'."""
# import py.std.StringIO
# assert 'py' in sys.modules
# assert 'py.std' in sys.modules
# assert 'py.std.StringIO' in sys.modules
# assert hasattr(py.std.StringIO, 'StringIO')
#
# def test_std_import1(self):
# """Testing 'from py import std'."""
# from py import std
# assert 'py' in sys.modules
# assert 'py.std' in sys.modules
#
# def test_std_from(self):
# """Testing 'from py.std import StringIO'."""
# from py.std import StringIO
# assert getattr(StringIO, 'StringIO')
#
# def test_std_star(self):
# "Test from py.std.string import *"
# """Testing 'from test.module import *'."""
# tmpdir = py.test.ensuretemp('test_initpkg')
# tfile = tmpdir.join('stdstartest.py')
# tfile.write(py.code.Source("""if True:
# from realtest.module import *
# globals()['mytest0']
# globals()['mytest1']
# globals()['MyTest']
# """))
# import stdstartest # an exception will be raise if an error occurs
##def test_help():
# help(std.path)
# #assert False
def test_url_of_version():
#py.test.skip("FAILING! - provide a proper URL or upload pylib tgz")
from urllib import URLopener
url = py.__pkg__.download_url
if url.lower() == "xxx":
assert py.__pkg__.version.find("alpha") != -1
else:
URLopener().open(url)

View File

@@ -0,0 +1,19 @@
import py, sys
from py.__.misc.killproc import killproc
def test_win_killsubprocess():
if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
py.test.skip("you\'re using an older version of windows, which "
"doesn\'t support 'taskkill' - py.misc.killproc is not "
"available")
tmp = py.test.ensuretemp("test_win_killsubprocess")
t = tmp.join("t.py")
t.write("import time ; time.sleep(100)")
proc = py.std.subprocess.Popen([sys.executable, str(t)])
assert proc.poll() is None # no return value yet
killproc(proc.pid)
ret = proc.wait()
assert ret != 0

View File

@@ -0,0 +1,13 @@
import py
def test_os():
import os
assert py.std.os is os
def test_import_error_converts_to_attributeerror():
py.test.raises(AttributeError, "py.std.xyzalskdj")
def test_std_gets_it():
for x in py.std.sys.modules:
assert x in py.std.__dict__

View File

@@ -0,0 +1,60 @@
import py
from py.__.misc import svnlook
data = py.magic.autopath().dirpath('data')
if py.path.local.sysfind('svnlook') is None or \
py.path.local.sysfind('svnadmin') is None:
py.test.skip("cannot test py.misc.svnlook, svn binaries not found")
def test_svnlook():
tempdir = py.test.ensuretemp("svnlook")
repo = tempdir.join("repo")
py.process.cmdexec('svnadmin create --fs-type fsfs "%s"' % repo)
py.process.cmdexec('svnadmin load "%s" < "%s"' %(repo,
data.join("svnlookrepo.dump")))
author = svnlook.author(repo, 1)
assert author == "hpk"
for item in svnlook.changed(repo, 1):
svnurl = item.svnurl()
assert item.revision == 1
assert (svnurl.strpath + "/") == "file://%s/%s" %(repo, item.path)
assert item.added
assert not item.modified
assert not item.propchanged
assert not item.deleted
assert item.path == "testdir/"
for item in svnlook.changed(repo, 2):
assert item.revision == 2
assert not item.added
assert not item.modified
assert item.propchanged
assert not item.deleted
assert item.path == "testdir/"
for item in svnlook.changed(repo, 3):
assert item.revision == 3
assert item.added
assert not item.modified
assert not item.propchanged
assert not item.deleted
assert item.path == "testdir2/"
for item in svnlook.changed(repo, 4):
assert item.revision == 4
assert not item.added
assert not item.modified
assert not item.propchanged
assert item.deleted
assert item.path == "testdir2/"
for item in svnlook.changed(repo, 5):
assert item.revision == 5
assert not item.added
assert not item.modified
assert item.propchanged
assert not item.deleted
assert item.path == "testdir/"

View File

@@ -0,0 +1,25 @@
import os
import py
from py.__.misc.terminal_helper import get_terminal_width
def test_terminal_width():
""" Dummy test for get_terminal_width
"""
assert get_terminal_width()
try:
import fcntl
except ImportError:
py.test.skip('fcntl not supported on this platform')
def f(*args):
raise ValueError
ioctl = fcntl.ioctl
fcntl.ioctl = f
try:
cols = os.environ.get('COLUMNS', None)
os.environ['COLUMNS'] = '42'
assert get_terminal_width() == 41
finally:
fcntl.ioctl = ioctl
if cols:
os.environ['COLUMNS'] = cols

View File

@@ -0,0 +1,75 @@
import py
import sys
here = py.magic.autopath().dirpath()
update_website = here.join('../../bin/_update_website.py').pyimport()
def test_rsync():
temp = py.test.ensuretemp('update_website_rsync')
pkgpath = temp.join('pkg')
apipath = temp.join('apigen')
pkgpath.ensure('foo/bar.txt', file=True).write('baz')
pkgpath.ensure('spam/eggs.txt', file=True).write('spam')
apipath.ensure('api/foo.html', file=True).write('<html />')
apipath.ensure('source/spam.html', file=True).write('<html />')
rsyncpath = temp.join('rsync')
assert not rsyncpath.check()
gateway = py.execnet.PopenGateway()
update_website.rsync(pkgpath, apipath, gateway, rsyncpath.strpath)
assert rsyncpath.check(dir=True)
assert rsyncpath.join('pkg').check(dir=True)
assert rsyncpath.join('pkg/spam/eggs.txt').read() == 'spam'
assert rsyncpath.join('apigen').check(dir=True)
assert rsyncpath.join('apigen/api/foo.html').read() == '<html />'
def setup_pkg(testname):
temp = py.test.ensuretemp(testname)
pkgpath = temp.ensure('pkg', dir=True)
pyfile = pkgpath.ensure('mod.py').write(py.code.Source("""
def foo(x):
return x + 1
"""))
testfile = pkgpath.ensure('test/test_mod.py').write(py.code.Source("""
from pkg.sub import foo
def test_foo():
assert foo(1) == 2
"""))
initfile = pkgpath.ensure('__init__.py').write(py.code.Source("""
import py
from py.__.initpkg import initpkg
initpkg(__name__, exportdefs={
'sub.foo': ('./mod.py', 'foo'),
})
"""))
initfile = pkgpath.ensure('apigen/apigen.py').write(py.code.Source("""
from py.__.apigen.apigen import build, \
get_documentable_items_pkgdir as get_documentable_items
"""))
return pkgpath
def test_run_tests():
if py.std.sys.platform == "win32":
py.test.skip("update_website is not supposed to be run from win32")
pkgpath = setup_pkg('update_website_run_tests')
errors = update_website.run_tests(pkgpath,
pkgpath.dirpath().join('apigen'),
captureouterr=True)
print errors
assert not errors
py.test.skip("Apigen turned off")
assert pkgpath.join('../apigen').check(dir=True)
assert pkgpath.join('../apigen/api/sub.foo.html').check(file=True)
def test_run_tests_failure():
if py.std.sys.platform == "win32":
py.test.skip("update_website is not supposed to be run from win32")
pkgpath = setup_pkg('update_website_run_tests_failure')
assert not pkgpath.join('../apigen').check(dir=True)
py.test.skip("Apigen turned off")
pkgpath.ensure('../apigen', file=True)
errors = update_website.run_tests(pkgpath,
pkgpath.dirpath().join('apigen'),
captureouterr=True)
assert errors # some error message