* introduce py.io.TextIO and py.io.StringIO to help with 3k transition and to clarify

intentions when doing "in-memory" files. Replace most usages of StringIO.

* consolidate py.io's files and tests into fewer files, make files 3k-importable

--HG--
branch : trunk
This commit is contained in:
holger krekel
2009-08-20 20:47:39 +02:00
parent 046ac957ab
commit 1fcd373bd5
23 changed files with 255 additions and 268 deletions

View File

@@ -1,8 +1,100 @@
import os
import sys
import py
try: from cStringIO import StringIO
except ImportError: from StringIO import StringIO
import tempfile
try:
from io import StringIO
except ImportError:
from StringIO import StringIO
class TextIO(StringIO):
def write(self, data):
if not isinstance(data, unicode):
data = unicode(data, getattr(self, '_encoding', 'UTF-8'))
StringIO.write(self, data)
try:
from io import BytesIO
except ImportError:
class BytesIO(StringIO):
def write(self, data):
if isinstance(data, unicode):
raise TypeError("not a byte value: %r" %(data,))
StringIO.write(self, data)
class FDCapture:
""" Capture IO to/from a given os-level filedescriptor. """
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
if tmpfile is None:
tmpfile = self.maketmpfile()
self.tmpfile = tmpfile
self._savefd = os.dup(targetfd)
os.dup2(self.tmpfile.fileno(), targetfd)
self._patched = []
def setasfile(self, name, module=sys):
""" patch <module>.<name> to self.tmpfile
"""
key = (module, name)
self._patched.append((key, getattr(module, name)))
setattr(module, name, self.tmpfile)
def unsetfiles(self):
""" unpatch all patched items
"""
while self._patched:
(module, name), value = self._patched.pop()
setattr(module, name, value)
def done(self):
""" unpatch and clean up, returns the self.tmpfile (file object)
"""
os.dup2(self._savefd, self.targetfd)
self.unsetfiles()
os.close(self._savefd)
self.tmpfile.seek(0)
return self.tmpfile
def maketmpfile(self):
""" create a temporary file
"""
f = tempfile.TemporaryFile()
newf = dupfile(f)
f.close()
return newf
def writeorg(self, str):
""" write a string to the original file descriptor
"""
tempfp = tempfile.TemporaryFile()
try:
os.dup2(self._savefd, tempfp.fileno())
tempfp.write(str)
finally:
tempfp.close()
def dupfile(f, mode=None, buffering=0, raising=False):
""" return a new open file object that's a duplicate of f
mode is duplicated if not given, 'buffering' controls
buffer size (defaulting to no buffering) and 'raising'
defines whether an exception is raised when an incompatible
file object is passed in (if raising is False, the file
object itself will be returned)
"""
try:
fd = f.fileno()
except AttributeError:
if raising:
raise
return f
newfd = os.dup(fd)
mode = mode and mode or f.mode
return os.fdopen(newfd, mode, buffering)
class Capture(object):
@@ -141,14 +233,14 @@ class StdCapture(Capture):
if out:
self._oldout = sys.stdout
if not hasattr(out, 'write'):
out = StringIO()
out = TextIO()
sys.stdout = self.out = out
if err:
self._olderr = sys.stderr
if out and mixed:
err = self.out
elif not hasattr(err, 'write'):
err = StringIO()
err = TextIO()
sys.stderr = self.err = err
if in_:
self._oldin = sys.stdin

View File

@@ -1,22 +0,0 @@
import os
def dupfile(f, mode=None, buffering=0, raising=False):
""" return a new open file object that's a duplicate of f
mode is duplicated if not given, 'buffering' controls
buffer size (defaulting to no buffering) and 'raising'
defines whether an exception is raised when an incompatible
file object is passed in (if raising is False, the file
object itself will be returned)
"""
try:
fd = f.fileno()
except AttributeError:
if raising:
raise
return f
newfd = os.dup(fd)
mode = mode and mode or f.mode
return os.fdopen(newfd, mode, buffering)

View File

@@ -1,59 +0,0 @@
import os
import sys
import py
import tempfile
class FDCapture:
""" Capture IO to/from a given os-level filedescriptor. """
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
if tmpfile is None:
tmpfile = self.maketmpfile()
self.tmpfile = tmpfile
self._savefd = os.dup(targetfd)
os.dup2(self.tmpfile.fileno(), targetfd)
self._patched = []
def setasfile(self, name, module=sys):
""" patch <module>.<name> to self.tmpfile
"""
key = (module, name)
self._patched.append((key, getattr(module, name)))
setattr(module, name, self.tmpfile)
def unsetfiles(self):
""" unpatch all patched items
"""
while self._patched:
(module, name), value = self._patched.pop()
setattr(module, name, value)
def done(self):
""" unpatch and clean up, returns the self.tmpfile (file object)
"""
os.dup2(self._savefd, self.targetfd)
self.unsetfiles()
os.close(self._savefd)
self.tmpfile.seek(0)
return self.tmpfile
def maketmpfile(self):
""" create a temporary file
"""
f = tempfile.TemporaryFile()
newf = py.io.dupfile(f)
f.close()
return newf
def writeorg(self, str):
""" write a string to the original file descriptor
"""
tempfp = tempfile.TemporaryFile()
try:
os.dup2(self._savefd, tempfp.fileno())
tempfp.write(str)
finally:
tempfp.close()

View File

@@ -70,7 +70,7 @@ if sys.platform == 'win32':
def get_terminal_width():
try:
height, width = _getdimensions()
except (SystemExit, KeyboardInterrupt), e:
except (SystemExit, KeyboardInterrupt):
raise
except:
# FALLBACK
@@ -144,7 +144,7 @@ class TerminalWriter(object):
def __init__(self, file=None, stringio=False):
if file is None:
if stringio:
self.stringio = file = py.std.cStringIO.StringIO()
self.stringio = file = py.io.TextIO()
else:
file = py.std.sys.stdout
elif callable(file):

View File

@@ -1,8 +1,31 @@
import os, sys
import py
class TestTextIO:
def test_text(self):
f = py.io.TextIO()
f.write("hello")
s = f.getvalue()
assert s == "hello"
f.close()
def test_unicode_and_str_mixture(self):
f = py.io.TextIO()
f.write(u"\u00f6")
f.write(str("hello"))
s = f.getvalue()
f.close()
assert isinstance(s, unicode)
def test_bytes_io():
f = py.io.BytesIO()
f.write("hello")
py.test.raises(TypeError, "f.write(u'hello')")
s = f.getvalue()
assert s == "hello"
def test_dontreadfrominput():
from py.__.io.stdcapture import DontReadFromInput
from py.__.io.capture import DontReadFromInput
f = DontReadFromInput()
assert not f.isatty()
py.test.raises(IOError, f.read)
@@ -10,6 +33,81 @@ def test_dontreadfrominput():
py.test.raises(IOError, iter, f)
py.test.raises(ValueError, f.fileno)
def test_dupfile():
somefile = py.std.os.tmpfile()
flist = []
for i in range(5):
nf = py.io.dupfile(somefile)
assert nf != somefile
assert nf.fileno() != somefile.fileno()
assert nf not in flist
print >>nf, i,
flist.append(nf)
for i in range(5):
f = flist[i]
f.close()
somefile.seek(0)
s = somefile.read()
assert s.startswith("01234")
somefile.close()
class TestFDCapture:
def test_basic(self):
tmpfile = py.std.os.tmpfile()
fd = tmpfile.fileno()
cap = py.io.FDCapture(fd)
os.write(fd, "hello")
f = cap.done()
s = f.read()
assert s == "hello"
def test_stderr(self):
cap = py.io.FDCapture(2)
cap.setasfile('stderr')
print >>sys.stderr, "hello"
f = cap.done()
s = f.read()
assert s == "hello\n"
def test_stdin(self):
f = os.tmpfile()
print >>f, "3"
f.seek(0)
cap = py.io.FDCapture(0, tmpfile=f)
# check with os.read() directly instead of raw_input(), because
# sys.stdin itself may be redirected (as py.test now does by default)
x = os.read(0, 100).strip()
f = cap.done()
assert x == "3"
def test_writeorg(self):
tmppath = py.test.ensuretemp('test_writeorg').ensure('stderr',
file=True)
tmpfp = tmppath.open('w+b')
try:
cap = py.io.FDCapture(tmpfp.fileno())
print >>tmpfp, 'foo'
cap.writeorg('bar\n')
finally:
tmpfp.close()
f = cap.done()
scap = f.read()
assert scap == 'foo\n'
stmp = tmppath.read()
assert stmp == "bar\n"
def test_writeorg_wrongtype(self):
tmppath = py.test.ensuretemp('test_writeorg').ensure('stdout',
file=True)
tmpfp = tmppath.open('r')
try:
cap = py.io.FDCapture(tmpfp.fileno())
py.test.raises(IOError, "cap.writeorg('bar\\n')")
finally:
tmpfp.close()
f = cap.done()
class TestStdCapture:
def getcapture(self, **kw):
return py.io.StdCapture(**kw)
@@ -64,8 +162,8 @@ class TestStdCapture:
cap = self.getcapture()
print "hello",
print >>sys.stderr, "world",
sys.stdout = py.std.StringIO.StringIO()
sys.stderr = py.std.StringIO.StringIO()
sys.stdout = py.io.TextIO()
sys.stderr = py.io.TextIO()
print "not seen"
print >>sys.stderr, "not seen"
out, err = cap.reset()

View File

@@ -1,20 +0,0 @@
import py
def test_dupfile():
somefile = py.std.os.tmpfile()
flist = []
for i in range(5):
nf = py.io.dupfile(somefile)
assert nf != somefile
assert nf.fileno() != somefile.fileno()
assert nf not in flist
print >>nf, i,
flist.append(nf)
for i in range(5):
f = flist[i]
f.close()
somefile.seek(0)
s = somefile.read()
assert s.startswith("01234")
somefile.close()

View File

@@ -1,59 +0,0 @@
import os, sys
import py
class TestFDCapture:
def test_basic(self):
tmpfile = py.std.os.tmpfile()
fd = tmpfile.fileno()
cap = py.io.FDCapture(fd)
os.write(fd, "hello")
f = cap.done()
s = f.read()
assert s == "hello"
def test_stderr(self):
cap = py.io.FDCapture(2)
cap.setasfile('stderr')
print >>sys.stderr, "hello"
f = cap.done()
s = f.read()
assert s == "hello\n"
def test_stdin(self):
f = os.tmpfile()
print >>f, "3"
f.seek(0)
cap = py.io.FDCapture(0, tmpfile=f)
# check with os.read() directly instead of raw_input(), because
# sys.stdin itself may be redirected (as py.test now does by default)
x = os.read(0, 100).strip()
f = cap.done()
assert x == "3"
def test_writeorg(self):
tmppath = py.test.ensuretemp('test_writeorg').ensure('stderr',
file=True)
tmpfp = tmppath.open('w+b')
try:
cap = py.io.FDCapture(tmpfp.fileno())
print >>tmpfp, 'foo'
cap.writeorg('bar\n')
finally:
tmpfp.close()
f = cap.done()
scap = f.read()
assert scap == 'foo\n'
stmp = tmppath.read()
assert stmp == "bar\n"
def test_writeorg_wrongtype(self):
tmppath = py.test.ensuretemp('test_writeorg').ensure('stdout',
file=True)
tmpfp = tmppath.open('r')
try:
cap = py.io.FDCapture(tmpfp.fileno())
py.test.raises(IOError, "cap.writeorg('bar\\n')")
finally:
tmpfp.close()
f = cap.done()

View File

@@ -1,7 +1,6 @@
import py
import os, sys
from py.__.io import terminalwriter
import StringIO
def skip_win32():
if sys.platform == 'win32':
@@ -23,12 +22,23 @@ def test_terminalwriter_default_instantiation():
def test_terminalwriter_dumb_term_no_markup(monkeypatch):
monkeypatch.setattr(os, 'environ', {'TERM': 'dumb', 'PATH': ''})
monkeypatch.setattr(sys, 'stdout', StringIO.StringIO())
monkeypatch.setattr(sys.stdout, 'isatty', lambda:True)
class MyFile:
def isatty(self):
return True
monkeypatch.setattr(sys, 'stdout', MyFile())
assert sys.stdout.isatty()
tw = py.io.TerminalWriter()
assert not tw.hasmarkup
def test_unicode_encoding():
l = []
msg = unicode('b\u00f6y', 'utf8')
for encoding in 'utf8', 'latin1':
tw = py.io.TerminalWriter(l.append)
tw._encoding = encoding
tw.line(msg)
assert l[0] == msg.encode(encoding)
class BaseTests:
def test_line(self):
tw = self.getwriter()
@@ -44,8 +54,7 @@ class BaseTests:
msg = unicode('b\u00f6y', 'utf8')
tw.line(msg)
l = self.getlines()
assert not isinstance(l[0], unicode)
assert unicode(l[0], encoding) == msg + "\n"
assert l[0] == msg + "\n"
def test_sep_no_title(self):
tw = self.getwriter()
@@ -105,7 +114,7 @@ class TestTmpfile(BaseTests):
io.flush()
return self.path.open('r').readlines()
class TestStringIO(BaseTests):
class TestWithStringIO(BaseTests):
def getwriter(self):
self.tw = py.io.TerminalWriter(stringio=True)
return self.tw
@@ -120,7 +129,7 @@ class TestCallableFile(BaseTests):
return py.io.TerminalWriter(self.writes.append)
def getlines(self):
io = py.std.cStringIO.StringIO()
io = py.io.TextIO()
io.write("".join(self.writes))
io.seek(0)
return io.readlines()