* fix capturing and unicode printing in tests
* introduce "_encoding" to py/io/terminalwriter writing * beautify a few __repr__ for better internal debugging --HG-- branch : 1.0.x
This commit is contained in:
parent
91597f4100
commit
8fcdac9dd6
|
@ -1,3 +1,10 @@
|
||||||
|
Changes between 1.0.0 and 1.0.1
|
||||||
|
=====================================
|
||||||
|
|
||||||
|
* various unicode fixes: capturing and prints of unicode strings now
|
||||||
|
work within tests, they are encoded as "utf8" by default, terminalwriting
|
||||||
|
was adapted and somewhat unified between windows and linux
|
||||||
|
|
||||||
Changes between 1.0.0b9 and 1.0.0
|
Changes between 1.0.0b9 and 1.0.0
|
||||||
=====================================
|
=====================================
|
||||||
|
|
||||||
|
|
10
py/_com.py
10
py/_com.py
|
@ -17,6 +17,16 @@ class MultiCall:
|
||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
self.results = []
|
self.results = []
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
args = []
|
||||||
|
if self.args:
|
||||||
|
args.append("posargs=%r" %(self.args,))
|
||||||
|
kw = self.kwargs
|
||||||
|
args.append(", ".join(["%s=%r" % x for x in self.kwargs.items()]))
|
||||||
|
args = " ".join(args)
|
||||||
|
status = "results: %r, rmethods: %r" % (self.results, self.methods)
|
||||||
|
return "<MultiCall %s %s>" %(args, status)
|
||||||
|
|
||||||
def execute(self, firstresult=False):
|
def execute(self, firstresult=False):
|
||||||
while self.methods:
|
while self.methods:
|
||||||
currentmethod = self.methods.pop()
|
currentmethod = self.methods.pop()
|
||||||
|
|
|
@ -76,7 +76,7 @@ class StdCaptureFD(Capture):
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
if out:
|
if out:
|
||||||
tmpfile = None
|
tmpfile = None
|
||||||
if isinstance(out, file):
|
if hasattr(out, 'write'):
|
||||||
tmpfile = out
|
tmpfile = out
|
||||||
self.out = py.io.FDCapture(1, tmpfile=tmpfile)
|
self.out = py.io.FDCapture(1, tmpfile=tmpfile)
|
||||||
if patchsys:
|
if patchsys:
|
||||||
|
@ -84,7 +84,7 @@ class StdCaptureFD(Capture):
|
||||||
if err:
|
if err:
|
||||||
if mixed and out:
|
if mixed and out:
|
||||||
tmpfile = self.out.tmpfile
|
tmpfile = self.out.tmpfile
|
||||||
elif isinstance(err, file):
|
elif hasattr(err, 'write'):
|
||||||
tmpfile = err
|
tmpfile = err
|
||||||
else:
|
else:
|
||||||
tmpfile = None
|
tmpfile = None
|
||||||
|
|
|
@ -139,6 +139,7 @@ class TerminalWriter(object):
|
||||||
Black=40, Red=41, Green=42, Yellow=43,
|
Black=40, Red=41, Green=42, Yellow=43,
|
||||||
Blue=44, Purple=45, Cyan=46, White=47,
|
Blue=44, Purple=45, Cyan=46, White=47,
|
||||||
bold=1, light=2, blink=5, invert=7)
|
bold=1, light=2, blink=5, invert=7)
|
||||||
|
_encoding = "utf-8"
|
||||||
|
|
||||||
def __init__(self, file=None, stringio=False):
|
def __init__(self, file=None, stringio=False):
|
||||||
if file is None:
|
if file is None:
|
||||||
|
@ -194,58 +195,27 @@ class TerminalWriter(object):
|
||||||
|
|
||||||
def write(self, s, **kw):
|
def write(self, s, **kw):
|
||||||
if s:
|
if s:
|
||||||
s = str(s)
|
s = self._getbytestring(s)
|
||||||
if self.hasmarkup and kw:
|
if self.hasmarkup and kw:
|
||||||
s = self.markup(s, **kw)
|
s = self.markup(s, **kw)
|
||||||
self._file.write(s)
|
self._file.write(s)
|
||||||
self._file.flush()
|
self._file.flush()
|
||||||
|
|
||||||
|
def _getbytestring(self, s):
|
||||||
|
if isinstance(s, unicode):
|
||||||
|
return s.encode(self._encoding)
|
||||||
|
elif not isinstance(s, str):
|
||||||
|
return str(s)
|
||||||
|
return s
|
||||||
|
|
||||||
def line(self, s='', **kw):
|
def line(self, s='', **kw):
|
||||||
self.write(s, **kw)
|
self.write(s, **kw)
|
||||||
self.write('\n')
|
self.write('\n')
|
||||||
|
|
||||||
class Win32ConsoleWriter(object):
|
class Win32ConsoleWriter(TerminalWriter):
|
||||||
|
|
||||||
def __init__(self, file=None, stringio=False):
|
|
||||||
if file is None:
|
|
||||||
if stringio:
|
|
||||||
self.stringio = file = py.std.cStringIO.StringIO()
|
|
||||||
else:
|
|
||||||
file = py.std.sys.stdout
|
|
||||||
elif callable(file):
|
|
||||||
file = WriteFile(file)
|
|
||||||
self._file = file
|
|
||||||
self.fullwidth = get_terminal_width()
|
|
||||||
self.hasmarkup = should_do_markup(file)
|
|
||||||
|
|
||||||
def sep(self, sepchar, title=None, fullwidth=None, **kw):
|
|
||||||
if fullwidth is None:
|
|
||||||
fullwidth = self.fullwidth
|
|
||||||
# the goal is to have the line be as long as possible
|
|
||||||
# under the condition that len(line) <= fullwidth
|
|
||||||
if title is not None:
|
|
||||||
# we want 2 + 2*len(fill) + len(title) <= fullwidth
|
|
||||||
# i.e. 2 + 2*len(sepchar)*N + len(title) <= fullwidth
|
|
||||||
# 2*len(sepchar)*N <= fullwidth - len(title) - 2
|
|
||||||
# N <= (fullwidth - len(title) - 2) // (2*len(sepchar))
|
|
||||||
N = (fullwidth - len(title) - 2) // (2*len(sepchar))
|
|
||||||
fill = sepchar * N
|
|
||||||
line = "%s %s %s" % (fill, title, fill)
|
|
||||||
else:
|
|
||||||
# we want len(sepchar)*N <= fullwidth
|
|
||||||
# i.e. N <= fullwidth // len(sepchar)
|
|
||||||
line = sepchar * (fullwidth // len(sepchar))
|
|
||||||
# in some situations there is room for an extra sepchar at the right,
|
|
||||||
# in particular if we consider that with a sepchar like "_ " the
|
|
||||||
# trailing space is not important at the end of the line
|
|
||||||
if len(line) + len(sepchar.rstrip()) <= fullwidth:
|
|
||||||
line += sepchar.rstrip()
|
|
||||||
|
|
||||||
self.line(line, **kw)
|
|
||||||
|
|
||||||
def write(self, s, **kw):
|
def write(self, s, **kw):
|
||||||
if s:
|
if s:
|
||||||
s = str(s)
|
s = self._getbytestring(s)
|
||||||
if self.hasmarkup:
|
if self.hasmarkup:
|
||||||
handle = GetStdHandle(STD_OUTPUT_HANDLE)
|
handle = GetStdHandle(STD_OUTPUT_HANDLE)
|
||||||
|
|
||||||
|
@ -269,8 +239,8 @@ class Win32ConsoleWriter(object):
|
||||||
if self.hasmarkup:
|
if self.hasmarkup:
|
||||||
SetConsoleTextAttribute(handle, FOREGROUND_WHITE)
|
SetConsoleTextAttribute(handle, FOREGROUND_WHITE)
|
||||||
|
|
||||||
def line(self, s='', **kw):
|
def line(self, s="", **kw):
|
||||||
self.write(s + '\n', **kw)
|
self.write(s+"\n", **kw)
|
||||||
|
|
||||||
if sys.platform == 'win32':
|
if sys.platform == 'win32':
|
||||||
TerminalWriter = Win32ConsoleWriter
|
TerminalWriter = Win32ConsoleWriter
|
||||||
|
|
|
@ -37,6 +37,16 @@ class BaseTests:
|
||||||
assert len(l) == 1
|
assert len(l) == 1
|
||||||
assert l[0] == "hello\n"
|
assert l[0] == "hello\n"
|
||||||
|
|
||||||
|
def test_line_unicode(self):
|
||||||
|
tw = self.getwriter()
|
||||||
|
for encoding in 'utf8', 'latin1':
|
||||||
|
tw._encoding = encoding
|
||||||
|
msg = unicode('b\u00f6y', 'utf8')
|
||||||
|
tw.line(msg)
|
||||||
|
l = self.getlines()
|
||||||
|
assert not isinstance(l[0], unicode)
|
||||||
|
assert unicode(l[0], encoding) == msg + "\n"
|
||||||
|
|
||||||
def test_sep_no_title(self):
|
def test_sep_no_title(self):
|
||||||
tw = self.getwriter()
|
tw = self.getwriter()
|
||||||
tw.sep("-", fullwidth=60)
|
tw.sep("-", fullwidth=60)
|
||||||
|
@ -85,6 +95,16 @@ class BaseTests:
|
||||||
l = self.getlines()
|
l = self.getlines()
|
||||||
assert len(l[0]) == len(l[1])
|
assert len(l[0]) == len(l[1])
|
||||||
|
|
||||||
|
class TestTmpfile(BaseTests):
|
||||||
|
def getwriter(self):
|
||||||
|
self.path = py.test.config.ensuretemp("terminalwriter").ensure("tmpfile")
|
||||||
|
self.tw = py.io.TerminalWriter(self.path.open('w+'))
|
||||||
|
return self.tw
|
||||||
|
def getlines(self):
|
||||||
|
io = self.tw._file
|
||||||
|
io.flush()
|
||||||
|
return self.path.open('r').readlines()
|
||||||
|
|
||||||
class TestStringIO(BaseTests):
|
class TestStringIO(BaseTests):
|
||||||
def getwriter(self):
|
def getwriter(self):
|
||||||
self.tw = py.io.TerminalWriter(stringio=True)
|
self.tw = py.io.TerminalWriter(stringio=True)
|
||||||
|
|
|
@ -10,6 +10,7 @@ class TestMultiCall:
|
||||||
def test_uses_copy_of_methods(self):
|
def test_uses_copy_of_methods(self):
|
||||||
l = [lambda: 42]
|
l = [lambda: 42]
|
||||||
mc = MultiCall(l)
|
mc = MultiCall(l)
|
||||||
|
repr(mc)
|
||||||
l[:] = []
|
l[:] = []
|
||||||
res = mc.execute()
|
res = mc.execute()
|
||||||
return res == 42
|
return res == 42
|
||||||
|
@ -33,16 +34,27 @@ class TestMultiCall:
|
||||||
p1 = P1()
|
p1 = P1()
|
||||||
p2 = P2()
|
p2 = P2()
|
||||||
multicall = MultiCall([p1.m, p2.m], 23)
|
multicall = MultiCall([p1.m, p2.m], 23)
|
||||||
|
assert "23" in repr(multicall)
|
||||||
reslist = multicall.execute()
|
reslist = multicall.execute()
|
||||||
assert len(reslist) == 2
|
assert len(reslist) == 2
|
||||||
# ensure reversed order
|
# ensure reversed order
|
||||||
assert reslist == [23, 17]
|
assert reslist == [23, 17]
|
||||||
|
|
||||||
|
def test_keyword_args(self):
|
||||||
|
def f(x):
|
||||||
|
return x + 1
|
||||||
|
multicall = MultiCall([f], x=23)
|
||||||
|
assert "x=23" in repr(multicall)
|
||||||
|
reslist = multicall.execute()
|
||||||
|
assert reslist == [24]
|
||||||
|
assert "24" in repr(multicall)
|
||||||
|
|
||||||
def test_optionalcallarg(self):
|
def test_optionalcallarg(self):
|
||||||
class P1:
|
class P1:
|
||||||
def m(self, x):
|
def m(self, x):
|
||||||
return x
|
return x
|
||||||
call = MultiCall([P1().m], 23)
|
call = MultiCall([P1().m], 23)
|
||||||
|
assert "23" in repr(call)
|
||||||
assert call.execute() == [23]
|
assert call.execute() == [23]
|
||||||
assert call.execute(firstresult=True) == 23
|
assert call.execute(firstresult=True) == 23
|
||||||
|
|
||||||
|
|
|
@ -145,11 +145,9 @@ class SlaveNode(object):
|
||||||
if call.excinfo:
|
if call.excinfo:
|
||||||
# likely it is not collectable here because of
|
# likely it is not collectable here because of
|
||||||
# platform/import-dependency induced skips
|
# platform/import-dependency induced skips
|
||||||
# XXX somewhat ugly shortcuts - also makes a collection
|
# we fake a setup-error report with the obtained exception
|
||||||
# failure into an ItemTestReport - this might confuse
|
# and do not care about capturing or non-runner hooks
|
||||||
# pytest_runtest_logreport hooks
|
|
||||||
rep = self.runner.pytest_runtest_makereport(item=item, call=call)
|
rep = self.runner.pytest_runtest_makereport(item=item, call=call)
|
||||||
self.pytest_runtest_logreport(rep)
|
self.pytest_runtest_logreport(rep)
|
||||||
return
|
return
|
||||||
item.config.hook.pytest_runtest_protocol(item=item)
|
item.config.hook.pytest_runtest_protocol(item=item)
|
||||||
|
|
||||||
|
|
|
@ -107,11 +107,24 @@ class CaptureManager:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._method2capture = {}
|
self._method2capture = {}
|
||||||
|
|
||||||
|
def _maketempfile(self):
|
||||||
|
f = py.std.tempfile.TemporaryFile()
|
||||||
|
newf = py.io.dupfile(f)
|
||||||
|
f.close()
|
||||||
|
return ustream(newf)
|
||||||
|
|
||||||
|
def _makestringio(self):
|
||||||
|
return py.std.StringIO.StringIO()
|
||||||
|
|
||||||
def _startcapture(self, method):
|
def _startcapture(self, method):
|
||||||
if method == "fd":
|
if method == "fd":
|
||||||
return py.io.StdCaptureFD()
|
return py.io.StdCaptureFD(
|
||||||
|
out=self._maketempfile(), err=self._maketempfile()
|
||||||
|
)
|
||||||
elif method == "sys":
|
elif method == "sys":
|
||||||
return py.io.StdCapture()
|
return py.io.StdCapture(
|
||||||
|
out=self._makestringio(), err=self._makestringio()
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise ValueError("unknown capturing method: %r" % method)
|
raise ValueError("unknown capturing method: %r" % method)
|
||||||
|
|
||||||
|
@ -252,3 +265,13 @@ class CaptureFuncarg:
|
||||||
def close(self):
|
def close(self):
|
||||||
self.capture.reset()
|
self.capture.reset()
|
||||||
del self.capture
|
del self.capture
|
||||||
|
|
||||||
|
def ustream(f):
|
||||||
|
import codecs
|
||||||
|
encoding = getattr(f, 'encoding', None) or "UTF-8"
|
||||||
|
reader = codecs.getreader(encoding)
|
||||||
|
writer = codecs.getwriter(encoding)
|
||||||
|
srw = codecs.StreamReaderWriter(f, reader, writer)
|
||||||
|
srw.encoding = encoding
|
||||||
|
return srw
|
||||||
|
|
||||||
|
|
|
@ -108,6 +108,13 @@ class CallInfo:
|
||||||
except:
|
except:
|
||||||
self.excinfo = py.code.ExceptionInfo()
|
self.excinfo = py.code.ExceptionInfo()
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
if self.excinfo:
|
||||||
|
status = "exception: %s" % str(self.excinfo.value)
|
||||||
|
else:
|
||||||
|
status = "result: %r" % (self.result,)
|
||||||
|
return "<CallInfo when=%r %s>" % (self.when, status)
|
||||||
|
|
||||||
def forked_run_report(item):
|
def forked_run_report(item):
|
||||||
# for now, we run setup/teardown in the subprocess
|
# for now, we run setup/teardown in the subprocess
|
||||||
# XXX optionally allow sharing of setup/teardown
|
# XXX optionally allow sharing of setup/teardown
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import py, os, sys
|
import py, os, sys
|
||||||
from py.__.test.plugin.pytest_capture import CaptureManager
|
from py.__.test.plugin.pytest_capture import CaptureManager, ustream
|
||||||
|
|
||||||
class TestCaptureManager:
|
class TestCaptureManager:
|
||||||
|
|
||||||
|
@ -54,6 +54,29 @@ class TestCaptureManager:
|
||||||
finally:
|
finally:
|
||||||
capouter.reset()
|
capouter.reset()
|
||||||
|
|
||||||
|
@py.test.mark.multi(method=['fd', 'sys'])
|
||||||
|
def test_capturing_unicode(testdir, method):
|
||||||
|
testdir.makepyfile("""
|
||||||
|
# taken from issue 227 from nosests
|
||||||
|
def test_unicode():
|
||||||
|
import sys
|
||||||
|
print sys.stdout
|
||||||
|
print u'b\\u00f6y'
|
||||||
|
""")
|
||||||
|
result = testdir.runpytest("--capture=%s" % method)
|
||||||
|
result.stdout.fnmatch_lines([
|
||||||
|
"*1 passed*"
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_ustream_helper(testdir):
|
||||||
|
p = testdir.makepyfile("hello")
|
||||||
|
f = p.open('w')
|
||||||
|
#f.encoding = "utf8"
|
||||||
|
x = ustream(f)
|
||||||
|
x.write(u'b\\00f6y')
|
||||||
|
x.close()
|
||||||
|
|
||||||
|
|
||||||
def test_collect_capturing(testdir):
|
def test_collect_capturing(testdir):
|
||||||
p = testdir.makepyfile("""
|
p = testdir.makepyfile("""
|
||||||
print "collect %s failure" % 13
|
print "collect %s failure" % 13
|
||||||
|
|
|
@ -271,3 +271,13 @@ def test_functional_boxed(testdir):
|
||||||
"*1 failed*"
|
"*1 failed*"
|
||||||
])
|
])
|
||||||
|
|
||||||
|
def test_callinfo():
|
||||||
|
ci = runner.CallInfo(lambda: 0, '123')
|
||||||
|
assert ci.when == "123"
|
||||||
|
assert ci.result == 0
|
||||||
|
assert "result" in repr(ci)
|
||||||
|
ci = runner.CallInfo(lambda: 0/0, '123')
|
||||||
|
assert ci.when == "123"
|
||||||
|
assert not hasattr(ci, 'result')
|
||||||
|
assert ci.excinfo
|
||||||
|
assert "exc" in repr(ci)
|
||||||
|
|
Loading…
Reference in New Issue