console.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. from __future__ import annotations
  2. import code
  3. import sys
  4. import typing as t
  5. from contextvars import ContextVar
  6. from types import CodeType
  7. from markupsafe import escape
  8. from .repr import debug_repr
  9. from .repr import dump
  10. from .repr import helper
  11. _stream: ContextVar[HTMLStringO] = ContextVar("werkzeug.debug.console.stream")
  12. _ipy: ContextVar = ContextVar("werkzeug.debug.console.ipy")
  13. class HTMLStringO:
  14. """A StringO version that HTML escapes on write."""
  15. def __init__(self) -> None:
  16. self._buffer: list[str] = []
  17. def isatty(self) -> bool:
  18. return False
  19. def close(self) -> None:
  20. pass
  21. def flush(self) -> None:
  22. pass
  23. def seek(self, n: int, mode: int = 0) -> None:
  24. pass
  25. def readline(self) -> str:
  26. if len(self._buffer) == 0:
  27. return ""
  28. ret = self._buffer[0]
  29. del self._buffer[0]
  30. return ret
  31. def reset(self) -> str:
  32. val = "".join(self._buffer)
  33. del self._buffer[:]
  34. return val
  35. def _write(self, x: str) -> None:
  36. self._buffer.append(x)
  37. def write(self, x: str) -> None:
  38. self._write(escape(x))
  39. def writelines(self, x: t.Iterable[str]) -> None:
  40. self._write(escape("".join(x)))
  41. class ThreadedStream:
  42. """Thread-local wrapper for sys.stdout for the interactive console."""
  43. @staticmethod
  44. def push() -> None:
  45. if not isinstance(sys.stdout, ThreadedStream):
  46. sys.stdout = t.cast(t.TextIO, ThreadedStream())
  47. _stream.set(HTMLStringO())
  48. @staticmethod
  49. def fetch() -> str:
  50. try:
  51. stream = _stream.get()
  52. except LookupError:
  53. return ""
  54. return stream.reset()
  55. @staticmethod
  56. def displayhook(obj: object) -> None:
  57. try:
  58. stream = _stream.get()
  59. except LookupError:
  60. return _displayhook(obj) # type: ignore
  61. # stream._write bypasses escaping as debug_repr is
  62. # already generating HTML for us.
  63. if obj is not None:
  64. _ipy.get().locals["_"] = obj
  65. stream._write(debug_repr(obj))
  66. def __setattr__(self, name: str, value: t.Any) -> None:
  67. raise AttributeError(f"read only attribute {name}")
  68. def __dir__(self) -> list[str]:
  69. return dir(sys.__stdout__)
  70. def __getattribute__(self, name: str) -> t.Any:
  71. try:
  72. stream = _stream.get()
  73. except LookupError:
  74. stream = sys.__stdout__ # type: ignore[assignment]
  75. return getattr(stream, name)
  76. def __repr__(self) -> str:
  77. return repr(sys.__stdout__)
  78. # add the threaded stream as display hook
  79. _displayhook = sys.displayhook
  80. sys.displayhook = ThreadedStream.displayhook
  81. class _ConsoleLoader:
  82. def __init__(self) -> None:
  83. self._storage: dict[int, str] = {}
  84. def register(self, code: CodeType, source: str) -> None:
  85. self._storage[id(code)] = source
  86. # register code objects of wrapped functions too.
  87. for var in code.co_consts:
  88. if isinstance(var, CodeType):
  89. self._storage[id(var)] = source
  90. def get_source_by_code(self, code: CodeType) -> str | None:
  91. try:
  92. return self._storage[id(code)]
  93. except KeyError:
  94. return None
  95. class _InteractiveConsole(code.InteractiveInterpreter):
  96. locals: dict[str, t.Any]
  97. def __init__(self, globals: dict[str, t.Any], locals: dict[str, t.Any]) -> None:
  98. self.loader = _ConsoleLoader()
  99. locals = {
  100. **globals,
  101. **locals,
  102. "dump": dump,
  103. "help": helper,
  104. "__loader__": self.loader,
  105. }
  106. super().__init__(locals)
  107. original_compile = self.compile
  108. def compile(source: str, filename: str, symbol: str) -> CodeType | None:
  109. code = original_compile(source, filename, symbol)
  110. if code is not None:
  111. self.loader.register(code, source)
  112. return code
  113. self.compile = compile # type: ignore[assignment]
  114. self.more = False
  115. self.buffer: list[str] = []
  116. def runsource(self, source: str, **kwargs: t.Any) -> str: # type: ignore
  117. source = f"{source.rstrip()}\n"
  118. ThreadedStream.push()
  119. prompt = "... " if self.more else ">>> "
  120. try:
  121. source_to_eval = "".join(self.buffer + [source])
  122. if super().runsource(source_to_eval, "<debugger>", "single"):
  123. self.more = True
  124. self.buffer.append(source)
  125. else:
  126. self.more = False
  127. del self.buffer[:]
  128. finally:
  129. output = ThreadedStream.fetch()
  130. return f"{prompt}{escape(source)}{output}"
  131. def runcode(self, code: CodeType) -> None:
  132. try:
  133. exec(code, self.locals)
  134. except Exception:
  135. self.showtraceback()
  136. def showtraceback(self) -> None:
  137. from .tbtools import DebugTraceback
  138. exc = t.cast(BaseException, sys.exc_info()[1])
  139. te = DebugTraceback(exc, skip=1)
  140. sys.stdout._write(te.render_traceback_html()) # type: ignore
  141. def showsyntaxerror(self, filename: str | None = None) -> None:
  142. from .tbtools import DebugTraceback
  143. exc = t.cast(BaseException, sys.exc_info()[1])
  144. te = DebugTraceback(exc, skip=4)
  145. sys.stdout._write(te.render_traceback_html()) # type: ignore
  146. def write(self, data: str) -> None:
  147. sys.stdout.write(data)
  148. class Console:
  149. """An interactive console."""
  150. def __init__(
  151. self,
  152. globals: dict[str, t.Any] | None = None,
  153. locals: dict[str, t.Any] | None = None,
  154. ) -> None:
  155. if locals is None:
  156. locals = {}
  157. if globals is None:
  158. globals = {}
  159. self._ipy = _InteractiveConsole(globals, locals)
  160. def eval(self, code: str) -> str:
  161. _ipy.set(self._ipy)
  162. old_sys_stdout = sys.stdout
  163. try:
  164. return self._ipy.runsource(code)
  165. finally:
  166. sys.stdout = old_sys_stdout