From 97bcf5a3a2fdabfdbc61bee275e97d09be728b08 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Thu, 16 Apr 2020 11:29:13 +0300 Subject: [PATCH] capture: reorder file into sections and avoid forward references Make it easier to read the file in progression, and avoid forward references for upcoming type annotations. There is one cycle, CaptureManager <-> CaptureFixture, which is hard to untangle. (This commit should be added to `.gitblameignore`). --- src/_pytest/capture.py | 1018 ++++++++++++++++++++-------------------- 1 file changed, 521 insertions(+), 497 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 1eb5e1b41..7a5e854ef 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -21,8 +21,6 @@ if TYPE_CHECKING: _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] -patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} - def pytest_addoption(parser): group = parser.getgroup("general") @@ -43,6 +41,105 @@ def pytest_addoption(parser): ) +def _colorama_workaround(): + """ + Ensure colorama is imported so that it attaches to the correct stdio + handles on Windows. + + colorama uses the terminal on import time. So if something does the + first import of colorama while I/O capture is active, colorama will + fail in various ways. + """ + if sys.platform.startswith("win32"): + try: + import colorama # noqa: F401 + except ImportError: + pass + + +def _readline_workaround(): + """ + Ensure readline is imported so that it attaches to the correct stdio + handles on Windows. + + Pdb uses readline support where available--when not running from the Python + prompt, the readline module is not imported until running the pdb REPL. If + running pytest with the --pdb option this means the readline module is not + imported until after I/O capture has been started. + + This is a problem for pyreadline, which is often used to implement readline + support on Windows, as it does not attach to the correct handles for stdout + and/or stdin if they have been redirected by the FDCapture mechanism. This + workaround ensures that readline is imported before I/O capture is setup so + that it can attach to the actual stdin/out for the console. + + See https://github.com/pytest-dev/pytest/pull/1281 + """ + if sys.platform.startswith("win32"): + try: + import readline # noqa: F401 + except ImportError: + pass + + +def _py36_windowsconsoleio_workaround(stream): + """ + Python 3.6 implemented unicode console handling for Windows. This works + by reading/writing to the raw console handle using + ``{Read,Write}ConsoleW``. + + The problem is that we are going to ``dup2`` over the stdio file + descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the + handles used by Python to write to the console. Though there is still some + weirdness and the console handle seems to only be closed randomly and not + on the first call to ``CloseHandle``, or maybe it gets reopened with the + same handle value when we suspend capturing. + + The workaround in this case will reopen stdio with a different fd which + also means a different handle by replicating the logic in + "Py_lifecycle.c:initstdio/create_stdio". + + :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given + here as parameter for unittesting purposes. + + See https://github.com/pytest-dev/py/issues/103 + """ + if ( + not sys.platform.startswith("win32") + or sys.version_info[:2] < (3, 6) + or hasattr(sys, "pypy_version_info") + ): + return + + # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) + if not hasattr(stream, "buffer"): + return + + buffered = hasattr(stream.buffer, "raw") + raw_stdout = stream.buffer.raw if buffered else stream.buffer + + if not isinstance(raw_stdout, io._WindowsConsoleIO): + return + + def _reopen_stdio(f, mode): + if not buffered and mode[0] == "w": + buffering = 0 + else: + buffering = -1 + + return io.TextIOWrapper( + open(os.dup(f.fileno()), mode, buffering), + f.encoding, + f.errors, + f.newlines, + f.line_buffering, + ) + + sys.stdin = _reopen_stdio(sys.stdin, "rb") + sys.stdout = _reopen_stdio(sys.stdout, "wb") + sys.stderr = _reopen_stdio(sys.stderr, "wb") + + @pytest.hookimpl(hookwrapper=True) def pytest_load_initial_conftests(early_config: Config): ns = early_config.known_args_namespace @@ -67,7 +164,362 @@ def pytest_load_initial_conftests(early_config: Config): sys.stderr.write(err) -def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture": +# IO Helpers. + + +class EncodedFile(io.TextIOWrapper): + __slots__ = () + + @property + def name(self) -> str: + # Ensure that file.name is a string. Workaround for a Python bug + # fixed in >=3.7.4: https://bugs.python.org/issue36015 + return repr(self.buffer) + + @property + def mode(self) -> str: + # TextIOWrapper doesn't expose a mode, but at least some of our + # tests check it. + return self.buffer.mode.replace("b", "") + + +class CaptureIO(io.TextIOWrapper): + def __init__(self) -> None: + super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) + + def getvalue(self) -> str: + assert isinstance(self.buffer, io.BytesIO) + return self.buffer.getvalue().decode("UTF-8") + + +class TeeCaptureIO(CaptureIO): + def __init__(self, other: TextIO) -> None: + self._other = other + super().__init__() + + def write(self, s) -> int: + super().write(s) + return self._other.write(s) + + +class DontReadFromInput: + encoding = None + + def read(self, *args): + raise OSError( + "pytest: reading from stdin while output is captured! Consider using `-s`." + ) + + readline = read + readlines = read + __next__ = read + + def __iter__(self): + return self + + def fileno(self): + raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") + + def isatty(self): + return False + + def close(self): + pass + + @property + def buffer(self): + return self + + +# Capture classes. + + +patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} + + +class NoCapture: + EMPTY_BUFFER = None + __init__ = start = done = suspend = resume = lambda *args: None + + +class SysCaptureBinary: + + EMPTY_BUFFER = b"" + _state = None + + def __init__(self, fd, tmpfile=None, *, tee=False): + name = patchsysdict[fd] + self._old = getattr(sys, name) + self.name = name + if tmpfile is None: + if name == "stdin": + tmpfile = DontReadFromInput() + else: + tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) + self.tmpfile = tmpfile + + def repr(self, class_name: str) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + class_name, + self.name, + hasattr(self, "_old") and repr(self._old) or "", + self._state, + self.tmpfile, + ) + + def __repr__(self) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.name, + hasattr(self, "_old") and repr(self._old) or "", + self._state, + self.tmpfile, + ) + + def start(self): + setattr(sys, self.name, self.tmpfile) + self._state = "started" + + def snap(self): + res = self.tmpfile.buffer.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + setattr(sys, self.name, self._old) + del self._old + self.tmpfile.close() + self._state = "done" + + def suspend(self): + setattr(sys, self.name, self._old) + self._state = "suspended" + + def resume(self): + setattr(sys, self.name, self.tmpfile) + self._state = "resumed" + + def writeorg(self, data): + self._old.flush() + self._old.buffer.write(data) + self._old.buffer.flush() + + +class SysCapture(SysCaptureBinary): + EMPTY_BUFFER = "" # type: ignore[assignment] + + def snap(self): + res = self.tmpfile.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def writeorg(self, data): + self._old.write(data) + self._old.flush() + + +class FDCaptureBinary: + """Capture IO to/from a given os-level filedescriptor. + + snap() produces `bytes` + """ + + EMPTY_BUFFER = b"" + _state = None + + def __init__(self, targetfd): + self.targetfd = targetfd + + try: + os.fstat(targetfd) + except OSError: + # FD capturing is conceptually simple -- create a temporary file, + # redirect the FD to it, redirect back when done. But when the + # target FD is invalid it throws a wrench into this loveley scheme. + # + # Tests themselves shouldn't care if the FD is valid, FD capturing + # should work regardless of external circumstances. So falling back + # to just sys capturing is not a good option. + # + # Further complications are the need to support suspend() and the + # possibility of FD reuse (e.g. the tmpfile getting the very same + # target FD). The following approach is robust, I believe. + self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) + os.dup2(self.targetfd_invalid, targetfd) + else: + self.targetfd_invalid = None + self.targetfd_save = os.dup(targetfd) + + if targetfd == 0: + self.tmpfile = open(os.devnull) + self.syscapture = SysCapture(targetfd) + else: + self.tmpfile = EncodedFile( + TemporaryFile(buffering=0), + encoding="utf-8", + errors="replace", + write_through=True, + ) + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, self.tmpfile) + else: + self.syscapture = NoCapture() + + def __repr__(self): + return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.targetfd, + self.targetfd_save, + self._state, + self.tmpfile, + ) + + def start(self): + """ Start capturing on targetfd using memorized tmpfile. """ + os.dup2(self.tmpfile.fileno(), self.targetfd) + self.syscapture.start() + self._state = "started" + + def snap(self): + self.tmpfile.seek(0) + res = self.tmpfile.buffer.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + """ stop capturing, restore streams, return original capture file, + seeked to position zero. """ + os.dup2(self.targetfd_save, self.targetfd) + os.close(self.targetfd_save) + if self.targetfd_invalid is not None: + if self.targetfd_invalid != self.targetfd: + os.close(self.targetfd) + os.close(self.targetfd_invalid) + self.syscapture.done() + self.tmpfile.close() + self._state = "done" + + def suspend(self): + self.syscapture.suspend() + os.dup2(self.targetfd_save, self.targetfd) + self._state = "suspended" + + def resume(self): + self.syscapture.resume() + os.dup2(self.tmpfile.fileno(), self.targetfd) + self._state = "resumed" + + def writeorg(self, data): + """ write to original file descriptor. """ + os.write(self.targetfd_save, data) + + +class FDCapture(FDCaptureBinary): + """Capture IO to/from a given os-level filedescriptor. + + snap() produces text + """ + + # Ignore type because it doesn't match the type in the superclass (bytes). + EMPTY_BUFFER = "" # type: ignore + + def snap(self): + self.tmpfile.seek(0) + res = self.tmpfile.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def writeorg(self, data): + """ write to original file descriptor. """ + super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream + + +# MultiCapture + +CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) + + +class MultiCapture: + _state = None + _in_suspended = False + + def __init__(self, in_, out, err) -> None: + self.in_ = in_ + self.out = out + self.err = err + + def __repr__(self): + return "".format( + self.out, self.err, self.in_, self._state, self._in_suspended, + ) + + def start_capturing(self): + self._state = "started" + if self.in_: + self.in_.start() + if self.out: + self.out.start() + if self.err: + self.err.start() + + def pop_outerr_to_orig(self): + """ pop current snapshot out/err capture and flush to orig streams. """ + out, err = self.readouterr() + if out: + self.out.writeorg(out) + if err: + self.err.writeorg(err) + return out, err + + def suspend_capturing(self, in_=False): + self._state = "suspended" + if self.out: + self.out.suspend() + if self.err: + self.err.suspend() + if in_ and self.in_: + self.in_.suspend() + self._in_suspended = True + + def resume_capturing(self): + self._state = "resumed" + if self.out: + self.out.resume() + if self.err: + self.err.resume() + if self._in_suspended: + self.in_.resume() + self._in_suspended = False + + def stop_capturing(self): + """ stop capturing and reset capturing streams """ + if self._state == "stopped": + raise ValueError("was already stopped") + self._state = "stopped" + if self.out: + self.out.done() + if self.err: + self.err.done() + if self.in_: + self.in_.done() + + def readouterr(self) -> CaptureResult: + if self.out: + out = self.out.snap() + else: + out = "" + if self.err: + err = self.err.snap() + else: + err = "" + return CaptureResult(out, err) + + +def _get_multicapture(method: "_CaptureMethod") -> MultiCapture: if method == "fd": return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) elif method == "sys": @@ -81,6 +533,9 @@ def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture": raise ValueError("unknown capturing method: {!r}".format(method)) +# CaptureManager and CaptureFixture + + class CaptureManager: """ Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each @@ -252,6 +707,69 @@ class CaptureManager: self.stop_global_capturing() +class CaptureFixture: + """ + Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` + fixtures. + """ + + def __init__(self, captureclass, request): + self.captureclass = captureclass + self.request = request + self._capture = None + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + + def _start(self): + if self._capture is None: + self._capture = MultiCapture( + in_=None, out=self.captureclass(1), err=self.captureclass(2), + ) + self._capture.start_capturing() + + def close(self): + if self._capture is not None: + out, err = self._capture.pop_outerr_to_orig() + self._captured_out += out + self._captured_err += err + self._capture.stop_capturing() + self._capture = None + + def readouterr(self): + """Read and return the captured output so far, resetting the internal buffer. + + :return: captured content as a namedtuple with ``out`` and ``err`` string attributes + """ + captured_out, captured_err = self._captured_out, self._captured_err + if self._capture is not None: + out, err = self._capture.readouterr() + captured_out += out + captured_err += err + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + return CaptureResult(captured_out, captured_err) + + def _suspend(self): + """Suspends this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.suspend_capturing() + + def _resume(self): + """Resumes this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.resume_capturing() + + @contextlib.contextmanager + def disabled(self): + """Temporarily disables capture while inside the 'with' block.""" + capmanager = self.request.config.pluginmanager.getplugin("capturemanager") + with capmanager.global_and_fixture_disabled(): + yield + + +# The fixtures. + + @pytest.fixture def capsys(request): """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. @@ -318,497 +836,3 @@ def capfdbinary(request): yield capture_fixture capture_fixture.close() capman.unset_fixture() - - -class CaptureIO(io.TextIOWrapper): - def __init__(self) -> None: - super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) - - def getvalue(self) -> str: - assert isinstance(self.buffer, io.BytesIO) - return self.buffer.getvalue().decode("UTF-8") - - -class TeeCaptureIO(CaptureIO): - def __init__(self, other: TextIO) -> None: - self._other = other - super().__init__() - - def write(self, s: str) -> int: - super().write(s) - return self._other.write(s) - - -class CaptureFixture: - """ - Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` - fixtures. - """ - - def __init__(self, captureclass, request): - self.captureclass = captureclass - self.request = request - self._capture = None - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - - def _start(self): - if self._capture is None: - self._capture = MultiCapture( - in_=None, out=self.captureclass(1), err=self.captureclass(2), - ) - self._capture.start_capturing() - - def close(self): - if self._capture is not None: - out, err = self._capture.pop_outerr_to_orig() - self._captured_out += out - self._captured_err += err - self._capture.stop_capturing() - self._capture = None - - def readouterr(self): - """Read and return the captured output so far, resetting the internal buffer. - - :return: captured content as a namedtuple with ``out`` and ``err`` string attributes - """ - captured_out, captured_err = self._captured_out, self._captured_err - if self._capture is not None: - out, err = self._capture.readouterr() - captured_out += out - captured_err += err - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - return CaptureResult(captured_out, captured_err) - - def _suspend(self): - """Suspends this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.suspend_capturing() - - def _resume(self): - """Resumes this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.resume_capturing() - - @contextlib.contextmanager - def disabled(self): - """Temporarily disables capture while inside the 'with' block.""" - capmanager = self.request.config.pluginmanager.getplugin("capturemanager") - with capmanager.global_and_fixture_disabled(): - yield - - -class EncodedFile(io.TextIOWrapper): - __slots__ = () - - @property - def name(self) -> str: - # Ensure that file.name is a string. Workaround for a Python bug - # fixed in >=3.7.4: https://bugs.python.org/issue36015 - return repr(self.buffer) - - @property - def mode(self) -> str: - # TextIOWrapper doesn't expose a mode, but at least some of our - # tests check it. - return self.buffer.mode.replace("b", "") - - -CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) - - -class MultiCapture: - _state = None - _in_suspended = False - - def __init__(self, in_, out, err) -> None: - self.in_ = in_ - self.out = out - self.err = err - - def __repr__(self): - return "".format( - self.out, self.err, self.in_, self._state, self._in_suspended, - ) - - def start_capturing(self): - self._state = "started" - if self.in_: - self.in_.start() - if self.out: - self.out.start() - if self.err: - self.err.start() - - def pop_outerr_to_orig(self): - """ pop current snapshot out/err capture and flush to orig streams. """ - out, err = self.readouterr() - if out: - self.out.writeorg(out) - if err: - self.err.writeorg(err) - return out, err - - def suspend_capturing(self, in_=False): - self._state = "suspended" - if self.out: - self.out.suspend() - if self.err: - self.err.suspend() - if in_ and self.in_: - self.in_.suspend() - self._in_suspended = True - - def resume_capturing(self): - self._state = "resumed" - if self.out: - self.out.resume() - if self.err: - self.err.resume() - if self._in_suspended: - self.in_.resume() - self._in_suspended = False - - def stop_capturing(self): - """ stop capturing and reset capturing streams """ - if self._state == "stopped": - raise ValueError("was already stopped") - self._state = "stopped" - if self.out: - self.out.done() - if self.err: - self.err.done() - if self.in_: - self.in_.done() - - def readouterr(self) -> CaptureResult: - if self.out: - out = self.out.snap() - else: - out = "" - if self.err: - err = self.err.snap() - else: - err = "" - return CaptureResult(out, err) - - -class NoCapture: - EMPTY_BUFFER = None - __init__ = start = done = suspend = resume = lambda *args: None - - -class FDCaptureBinary: - """Capture IO to/from a given os-level filedescriptor. - - snap() produces `bytes` - """ - - EMPTY_BUFFER = b"" - _state = None - - def __init__(self, targetfd): - self.targetfd = targetfd - - try: - os.fstat(targetfd) - except OSError: - # FD capturing is conceptually simple -- create a temporary file, - # redirect the FD to it, redirect back when done. But when the - # target FD is invalid it throws a wrench into this loveley scheme. - # - # Tests themselves shouldn't care if the FD is valid, FD capturing - # should work regardless of external circumstances. So falling back - # to just sys capturing is not a good option. - # - # Further complications are the need to support suspend() and the - # possibility of FD reuse (e.g. the tmpfile getting the very same - # target FD). The following approach is robust, I believe. - self.targetfd_invalid = os.open(os.devnull, os.O_RDWR) - os.dup2(self.targetfd_invalid, targetfd) - else: - self.targetfd_invalid = None - self.targetfd_save = os.dup(targetfd) - - if targetfd == 0: - self.tmpfile = open(os.devnull) - self.syscapture = SysCapture(targetfd) - else: - self.tmpfile = EncodedFile( - TemporaryFile(buffering=0), - encoding="utf-8", - errors="replace", - write_through=True, - ) - if targetfd in patchsysdict: - self.syscapture = SysCapture(targetfd, self.tmpfile) - else: - self.syscapture = NoCapture() - - def __repr__(self): - return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.targetfd, - self.targetfd_save, - self._state, - self.tmpfile, - ) - - def start(self): - """ Start capturing on targetfd using memorized tmpfile. """ - os.dup2(self.tmpfile.fileno(), self.targetfd) - self.syscapture.start() - self._state = "started" - - def snap(self): - self.tmpfile.seek(0) - res = self.tmpfile.buffer.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - """ stop capturing, restore streams, return original capture file, - seeked to position zero. """ - os.dup2(self.targetfd_save, self.targetfd) - os.close(self.targetfd_save) - if self.targetfd_invalid is not None: - if self.targetfd_invalid != self.targetfd: - os.close(self.targetfd) - os.close(self.targetfd_invalid) - self.syscapture.done() - self.tmpfile.close() - self._state = "done" - - def suspend(self): - self.syscapture.suspend() - os.dup2(self.targetfd_save, self.targetfd) - self._state = "suspended" - - def resume(self): - self.syscapture.resume() - os.dup2(self.tmpfile.fileno(), self.targetfd) - self._state = "resumed" - - def writeorg(self, data): - """ write to original file descriptor. """ - os.write(self.targetfd_save, data) - - -class FDCapture(FDCaptureBinary): - """Capture IO to/from a given os-level filedescriptor. - - snap() produces text - """ - - # Ignore type because it doesn't match the type in the superclass (bytes). - EMPTY_BUFFER = "" # type: ignore - - def snap(self): - self.tmpfile.seek(0) - res = self.tmpfile.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - """ write to original file descriptor. """ - super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream - - -class SysCaptureBinary: - - EMPTY_BUFFER = b"" - _state = None - - def __init__(self, fd, tmpfile=None, *, tee=False): - name = patchsysdict[fd] - self._old = getattr(sys, name) - self.name = name - if tmpfile is None: - if name == "stdin": - tmpfile = DontReadFromInput() - else: - tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) - self.tmpfile = tmpfile - - def __repr__(self): - return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.name, - hasattr(self, "_old") and repr(self._old) or "", - self._state, - self.tmpfile, - ) - - def start(self): - setattr(sys, self.name, self.tmpfile) - self._state = "started" - - def snap(self): - res = self.tmpfile.buffer.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - setattr(sys, self.name, self._old) - del self._old - self.tmpfile.close() - self._state = "done" - - def suspend(self): - setattr(sys, self.name, self._old) - self._state = "suspended" - - def resume(self): - setattr(sys, self.name, self.tmpfile) - self._state = "resumed" - - def writeorg(self, data): - self._old.flush() - self._old.buffer.write(data) - self._old.buffer.flush() - - -class SysCapture(SysCaptureBinary): - EMPTY_BUFFER = "" # type: ignore[assignment] - - def snap(self): - res = self.tmpfile.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - self._old.write(data) - self._old.flush() - - -class DontReadFromInput: - encoding = None - - def read(self, *args): - raise OSError( - "pytest: reading from stdin while output is captured! Consider using `-s`." - ) - - readline = read - readlines = read - __next__ = read - - def __iter__(self): - return self - - def fileno(self): - raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") - - def isatty(self): - return False - - def close(self): - pass - - @property - def buffer(self): - return self - - -def _colorama_workaround(): - """ - Ensure colorama is imported so that it attaches to the correct stdio - handles on Windows. - - colorama uses the terminal on import time. So if something does the - first import of colorama while I/O capture is active, colorama will - fail in various ways. - """ - if sys.platform.startswith("win32"): - try: - import colorama # noqa: F401 - except ImportError: - pass - - -def _readline_workaround(): - """ - Ensure readline is imported so that it attaches to the correct stdio - handles on Windows. - - Pdb uses readline support where available--when not running from the Python - prompt, the readline module is not imported until running the pdb REPL. If - running pytest with the --pdb option this means the readline module is not - imported until after I/O capture has been started. - - This is a problem for pyreadline, which is often used to implement readline - support on Windows, as it does not attach to the correct handles for stdout - and/or stdin if they have been redirected by the FDCapture mechanism. This - workaround ensures that readline is imported before I/O capture is setup so - that it can attach to the actual stdin/out for the console. - - See https://github.com/pytest-dev/pytest/pull/1281 - """ - if sys.platform.startswith("win32"): - try: - import readline # noqa: F401 - except ImportError: - pass - - -def _py36_windowsconsoleio_workaround(stream): - """ - Python 3.6 implemented unicode console handling for Windows. This works - by reading/writing to the raw console handle using - ``{Read,Write}ConsoleW``. - - The problem is that we are going to ``dup2`` over the stdio file - descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the - handles used by Python to write to the console. Though there is still some - weirdness and the console handle seems to only be closed randomly and not - on the first call to ``CloseHandle``, or maybe it gets reopened with the - same handle value when we suspend capturing. - - The workaround in this case will reopen stdio with a different fd which - also means a different handle by replicating the logic in - "Py_lifecycle.c:initstdio/create_stdio". - - :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given - here as parameter for unittesting purposes. - - See https://github.com/pytest-dev/py/issues/103 - """ - if ( - not sys.platform.startswith("win32") - or sys.version_info[:2] < (3, 6) - or hasattr(sys, "pypy_version_info") - ): - return - - # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) - if not hasattr(stream, "buffer"): - return - - buffered = hasattr(stream.buffer, "raw") - raw_stdout = stream.buffer.raw if buffered else stream.buffer - - if not isinstance(raw_stdout, io._WindowsConsoleIO): - return - - def _reopen_stdio(f, mode): - if not buffered and mode[0] == "w": - buffering = 0 - else: - buffering = -1 - - return io.TextIOWrapper( - open(os.dup(f.fileno()), mode, buffering), - f.encoding, - f.errors, - f.newlines, - f.line_buffering, - ) - - sys.stdin = _reopen_stdio(sys.stdin, "rb") - sys.stdout = _reopen_stdio(sys.stdout, "wb") - sys.stderr = _reopen_stdio(sys.stderr, "wb")