lint.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. """
  2. WSGI Protocol Linter
  3. ====================
  4. This module provides a middleware that performs sanity checks on the
  5. behavior of the WSGI server and application. It checks that the
  6. :pep:`3333` WSGI spec is properly implemented. It also warns on some
  7. common HTTP errors such as non-empty responses for 304 status codes.
  8. .. autoclass:: LintMiddleware
  9. :copyright: 2007 Pallets
  10. :license: BSD-3-Clause
  11. """
  12. from __future__ import annotations
  13. import typing as t
  14. from types import TracebackType
  15. from urllib.parse import urlparse
  16. from warnings import warn
  17. from ..datastructures import Headers
  18. from ..http import is_entity_header
  19. from ..wsgi import FileWrapper
  20. if t.TYPE_CHECKING:
  21. from _typeshed.wsgi import StartResponse
  22. from _typeshed.wsgi import WSGIApplication
  23. from _typeshed.wsgi import WSGIEnvironment
  24. class WSGIWarning(Warning):
  25. """Warning class for WSGI warnings."""
  26. class HTTPWarning(Warning):
  27. """Warning class for HTTP warnings."""
  28. def check_type(context: str, obj: object, need: type = str) -> None:
  29. if type(obj) is not need:
  30. warn(
  31. f"{context!r} requires {need.__name__!r}, got {type(obj).__name__!r}.",
  32. WSGIWarning,
  33. stacklevel=3,
  34. )
  35. class InputStream:
  36. def __init__(self, stream: t.IO[bytes]) -> None:
  37. self._stream = stream
  38. def read(self, *args: t.Any) -> bytes:
  39. if len(args) == 0:
  40. warn(
  41. "WSGI does not guarantee an EOF marker on the input stream, thus making"
  42. " calls to 'wsgi.input.read()' unsafe. Conforming servers may never"
  43. " return from this call.",
  44. WSGIWarning,
  45. stacklevel=2,
  46. )
  47. elif len(args) != 1:
  48. warn(
  49. "Too many parameters passed to 'wsgi.input.read()'.",
  50. WSGIWarning,
  51. stacklevel=2,
  52. )
  53. return self._stream.read(*args)
  54. def readline(self, *args: t.Any) -> bytes:
  55. if len(args) == 0:
  56. warn(
  57. "Calls to 'wsgi.input.readline()' without arguments are unsafe. Use"
  58. " 'wsgi.input.read()' instead.",
  59. WSGIWarning,
  60. stacklevel=2,
  61. )
  62. elif len(args) == 1:
  63. warn(
  64. "'wsgi.input.readline()' was called with a size hint. WSGI does not"
  65. " support this, although it's available on all major servers.",
  66. WSGIWarning,
  67. stacklevel=2,
  68. )
  69. else:
  70. raise TypeError("Too many arguments passed to 'wsgi.input.readline()'.")
  71. return self._stream.readline(*args)
  72. def __iter__(self) -> t.Iterator[bytes]:
  73. try:
  74. return iter(self._stream)
  75. except TypeError:
  76. warn("'wsgi.input' is not iterable.", WSGIWarning, stacklevel=2)
  77. return iter(())
  78. def close(self) -> None:
  79. warn("The application closed the input stream!", WSGIWarning, stacklevel=2)
  80. self._stream.close()
  81. class ErrorStream:
  82. def __init__(self, stream: t.IO[str]) -> None:
  83. self._stream = stream
  84. def write(self, s: str) -> None:
  85. check_type("wsgi.error.write()", s, str)
  86. self._stream.write(s)
  87. def flush(self) -> None:
  88. self._stream.flush()
  89. def writelines(self, seq: t.Iterable[str]) -> None:
  90. for line in seq:
  91. self.write(line)
  92. def close(self) -> None:
  93. warn("The application closed the error stream!", WSGIWarning, stacklevel=2)
  94. self._stream.close()
  95. class GuardedWrite:
  96. def __init__(self, write: t.Callable[[bytes], object], chunks: list[int]) -> None:
  97. self._write = write
  98. self._chunks = chunks
  99. def __call__(self, s: bytes) -> None:
  100. check_type("write()", s, bytes)
  101. self._write(s)
  102. self._chunks.append(len(s))
  103. class GuardedIterator:
  104. def __init__(
  105. self,
  106. iterator: t.Iterable[bytes],
  107. headers_set: tuple[int, Headers],
  108. chunks: list[int],
  109. ) -> None:
  110. self._iterator = iterator
  111. self._next = iter(iterator).__next__
  112. self.closed = False
  113. self.headers_set = headers_set
  114. self.chunks = chunks
  115. def __iter__(self) -> GuardedIterator:
  116. return self
  117. def __next__(self) -> bytes:
  118. if self.closed:
  119. warn("Iterated over closed 'app_iter'.", WSGIWarning, stacklevel=2)
  120. rv = self._next()
  121. if not self.headers_set:
  122. warn(
  123. "The application returned before it started the response.",
  124. WSGIWarning,
  125. stacklevel=2,
  126. )
  127. check_type("application iterator items", rv, bytes)
  128. self.chunks.append(len(rv))
  129. return rv
  130. def close(self) -> None:
  131. self.closed = True
  132. if hasattr(self._iterator, "close"):
  133. self._iterator.close()
  134. if self.headers_set:
  135. status_code, headers = self.headers_set
  136. bytes_sent = sum(self.chunks)
  137. content_length = headers.get("content-length", type=int)
  138. if status_code == 304:
  139. for key, _value in headers:
  140. key = key.lower()
  141. if key not in ("expires", "content-location") and is_entity_header(
  142. key
  143. ):
  144. warn(
  145. f"Entity header {key!r} found in 304 response.",
  146. HTTPWarning,
  147. stacklevel=2,
  148. )
  149. if bytes_sent:
  150. warn(
  151. "304 responses must not have a body.",
  152. HTTPWarning,
  153. stacklevel=2,
  154. )
  155. elif 100 <= status_code < 200 or status_code == 204:
  156. if content_length != 0:
  157. warn(
  158. f"{status_code} responses must have an empty content length.",
  159. HTTPWarning,
  160. stacklevel=2,
  161. )
  162. if bytes_sent:
  163. warn(
  164. f"{status_code} responses must not have a body.",
  165. HTTPWarning,
  166. stacklevel=2,
  167. )
  168. elif content_length is not None and content_length != bytes_sent:
  169. warn(
  170. "Content-Length and the number of bytes sent to the"
  171. " client do not match.",
  172. WSGIWarning,
  173. stacklevel=2,
  174. )
  175. def __del__(self) -> None:
  176. if not self.closed:
  177. try:
  178. warn(
  179. "Iterator was garbage collected before it was closed.",
  180. WSGIWarning,
  181. stacklevel=2,
  182. )
  183. except Exception:
  184. pass
  185. class LintMiddleware:
  186. """Warns about common errors in the WSGI and HTTP behavior of the
  187. server and wrapped application. Some of the issues it checks are:
  188. - invalid status codes
  189. - non-bytes sent to the WSGI server
  190. - strings returned from the WSGI application
  191. - non-empty conditional responses
  192. - unquoted etags
  193. - relative URLs in the Location header
  194. - unsafe calls to wsgi.input
  195. - unclosed iterators
  196. Error information is emitted using the :mod:`warnings` module.
  197. :param app: The WSGI application to wrap.
  198. .. code-block:: python
  199. from werkzeug.middleware.lint import LintMiddleware
  200. app = LintMiddleware(app)
  201. """
  202. def __init__(self, app: WSGIApplication) -> None:
  203. self.app = app
  204. def check_environ(self, environ: WSGIEnvironment) -> None:
  205. if type(environ) is not dict: # noqa: E721
  206. warn(
  207. "WSGI environment is not a standard Python dict.",
  208. WSGIWarning,
  209. stacklevel=4,
  210. )
  211. for key in (
  212. "REQUEST_METHOD",
  213. "SERVER_NAME",
  214. "SERVER_PORT",
  215. "wsgi.version",
  216. "wsgi.input",
  217. "wsgi.errors",
  218. "wsgi.multithread",
  219. "wsgi.multiprocess",
  220. "wsgi.run_once",
  221. ):
  222. if key not in environ:
  223. warn(
  224. f"Required environment key {key!r} not found",
  225. WSGIWarning,
  226. stacklevel=3,
  227. )
  228. if environ["wsgi.version"] != (1, 0):
  229. warn("Environ is not a WSGI 1.0 environ.", WSGIWarning, stacklevel=3)
  230. script_name = environ.get("SCRIPT_NAME", "")
  231. path_info = environ.get("PATH_INFO", "")
  232. if script_name and script_name[0] != "/":
  233. warn(
  234. f"'SCRIPT_NAME' does not start with a slash: {script_name!r}",
  235. WSGIWarning,
  236. stacklevel=3,
  237. )
  238. if path_info and path_info[0] != "/":
  239. warn(
  240. f"'PATH_INFO' does not start with a slash: {path_info!r}",
  241. WSGIWarning,
  242. stacklevel=3,
  243. )
  244. def check_start_response(
  245. self,
  246. status: str,
  247. headers: list[tuple[str, str]],
  248. exc_info: None | (tuple[type[BaseException], BaseException, TracebackType]),
  249. ) -> tuple[int, Headers]:
  250. check_type("status", status, str)
  251. status_code_str = status.split(None, 1)[0]
  252. if len(status_code_str) != 3 or not status_code_str.isdecimal():
  253. warn("Status code must be three digits.", WSGIWarning, stacklevel=3)
  254. if len(status) < 4 or status[3] != " ":
  255. warn(
  256. f"Invalid value for status {status!r}. Valid status strings are three"
  257. " digits, a space and a status explanation.",
  258. WSGIWarning,
  259. stacklevel=3,
  260. )
  261. status_code = int(status_code_str)
  262. if status_code < 100:
  263. warn("Status code < 100 detected.", WSGIWarning, stacklevel=3)
  264. if type(headers) is not list: # noqa: E721
  265. warn("Header list is not a list.", WSGIWarning, stacklevel=3)
  266. for item in headers:
  267. if type(item) is not tuple or len(item) != 2:
  268. warn("Header items must be 2-item tuples.", WSGIWarning, stacklevel=3)
  269. name, value = item
  270. if type(name) is not str or type(value) is not str: # noqa: E721
  271. warn(
  272. "Header keys and values must be strings.", WSGIWarning, stacklevel=3
  273. )
  274. if name.lower() == "status":
  275. warn(
  276. "The status header is not supported due to"
  277. " conflicts with the CGI spec.",
  278. WSGIWarning,
  279. stacklevel=3,
  280. )
  281. if exc_info is not None and not isinstance(exc_info, tuple):
  282. warn("Invalid value for exc_info.", WSGIWarning, stacklevel=3)
  283. headers = Headers(headers)
  284. self.check_headers(headers)
  285. return status_code, headers
  286. def check_headers(self, headers: Headers) -> None:
  287. etag = headers.get("etag")
  288. if etag is not None:
  289. if etag.startswith(("W/", "w/")):
  290. if etag.startswith("w/"):
  291. warn(
  292. "Weak etag indicator should be upper case.",
  293. HTTPWarning,
  294. stacklevel=4,
  295. )
  296. etag = etag[2:]
  297. if not (etag[:1] == etag[-1:] == '"'):
  298. warn("Unquoted etag emitted.", HTTPWarning, stacklevel=4)
  299. location = headers.get("location")
  300. if location is not None:
  301. if not urlparse(location).netloc:
  302. warn(
  303. "Absolute URLs required for location header.",
  304. HTTPWarning,
  305. stacklevel=4,
  306. )
  307. def check_iterator(self, app_iter: t.Iterable[bytes]) -> None:
  308. if isinstance(app_iter, str):
  309. warn(
  310. "The application returned a string. The response will send one"
  311. " character at a time to the client, which will kill performance."
  312. " Return a list or iterable instead.",
  313. WSGIWarning,
  314. stacklevel=3,
  315. )
  316. def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Iterable[bytes]:
  317. if len(args) != 2:
  318. warn("A WSGI app takes two arguments.", WSGIWarning, stacklevel=2)
  319. if kwargs:
  320. warn(
  321. "A WSGI app does not take keyword arguments.", WSGIWarning, stacklevel=2
  322. )
  323. environ: WSGIEnvironment = args[0]
  324. start_response: StartResponse = args[1]
  325. self.check_environ(environ)
  326. environ["wsgi.input"] = InputStream(environ["wsgi.input"])
  327. environ["wsgi.errors"] = ErrorStream(environ["wsgi.errors"])
  328. # Hook our own file wrapper in so that applications will always
  329. # iterate to the end and we can check the content length.
  330. environ["wsgi.file_wrapper"] = FileWrapper
  331. headers_set: list[t.Any] = []
  332. chunks: list[int] = []
  333. def checking_start_response(
  334. *args: t.Any, **kwargs: t.Any
  335. ) -> t.Callable[[bytes], None]:
  336. if len(args) not in {2, 3}:
  337. warn(
  338. f"Invalid number of arguments: {len(args)}, expected 2 or 3.",
  339. WSGIWarning,
  340. stacklevel=2,
  341. )
  342. if kwargs:
  343. warn(
  344. "'start_response' does not take keyword arguments.",
  345. WSGIWarning,
  346. stacklevel=2,
  347. )
  348. status: str = args[0]
  349. headers: list[tuple[str, str]] = args[1]
  350. exc_info: None | (
  351. tuple[type[BaseException], BaseException, TracebackType]
  352. ) = args[2] if len(args) == 3 else None
  353. headers_set[:] = self.check_start_response(status, headers, exc_info)
  354. return GuardedWrite(start_response(status, headers, exc_info), chunks)
  355. app_iter = self.app(environ, t.cast("StartResponse", checking_start_response))
  356. self.check_iterator(app_iter)
  357. return GuardedIterator(
  358. app_iter, t.cast(t.Tuple[int, Headers], headers_set), chunks
  359. )