_internal.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from __future__ import annotations
  2. import logging
  3. import re
  4. import sys
  5. import typing as t
  6. from datetime import datetime
  7. from datetime import timezone
  8. if t.TYPE_CHECKING:
  9. from _typeshed.wsgi import WSGIEnvironment
  10. from .wrappers.request import Request
  11. _logger: logging.Logger | None = None
  12. class _Missing:
  13. def __repr__(self) -> str:
  14. return "no value"
  15. def __reduce__(self) -> str:
  16. return "_missing"
  17. _missing = _Missing()
  18. def _wsgi_decoding_dance(s: str) -> str:
  19. return s.encode("latin1").decode(errors="replace")
  20. def _wsgi_encoding_dance(s: str) -> str:
  21. return s.encode().decode("latin1")
  22. def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
  23. env = getattr(obj, "environ", obj)
  24. assert isinstance(
  25. env, dict
  26. ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
  27. return env
  28. def _has_level_handler(logger: logging.Logger) -> bool:
  29. """Check if there is a handler in the logging chain that will handle
  30. the given logger's effective level.
  31. """
  32. level = logger.getEffectiveLevel()
  33. current = logger
  34. while current:
  35. if any(handler.level <= level for handler in current.handlers):
  36. return True
  37. if not current.propagate:
  38. break
  39. current = current.parent # type: ignore
  40. return False
  41. class _ColorStreamHandler(logging.StreamHandler):
  42. """On Windows, wrap stream with Colorama for ANSI style support."""
  43. def __init__(self) -> None:
  44. try:
  45. import colorama
  46. except ImportError:
  47. stream = None
  48. else:
  49. stream = colorama.AnsiToWin32(sys.stderr)
  50. super().__init__(stream)
  51. def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
  52. """Log a message to the 'werkzeug' logger.
  53. The logger is created the first time it is needed. If there is no
  54. level set, it is set to :data:`logging.INFO`. If there is no handler
  55. for the logger's effective level, a :class:`logging.StreamHandler`
  56. is added.
  57. """
  58. global _logger
  59. if _logger is None:
  60. _logger = logging.getLogger("werkzeug")
  61. if _logger.level == logging.NOTSET:
  62. _logger.setLevel(logging.INFO)
  63. if not _has_level_handler(_logger):
  64. _logger.addHandler(_ColorStreamHandler())
  65. getattr(_logger, type)(message.rstrip(), *args, **kwargs)
  66. @t.overload
  67. def _dt_as_utc(dt: None) -> None:
  68. ...
  69. @t.overload
  70. def _dt_as_utc(dt: datetime) -> datetime:
  71. ...
  72. def _dt_as_utc(dt: datetime | None) -> datetime | None:
  73. if dt is None:
  74. return dt
  75. if dt.tzinfo is None:
  76. return dt.replace(tzinfo=timezone.utc)
  77. elif dt.tzinfo != timezone.utc:
  78. return dt.astimezone(timezone.utc)
  79. return dt
  80. _TAccessorValue = t.TypeVar("_TAccessorValue")
  81. class _DictAccessorProperty(t.Generic[_TAccessorValue]):
  82. """Baseclass for `environ_property` and `header_property`."""
  83. read_only = False
  84. def __init__(
  85. self,
  86. name: str,
  87. default: _TAccessorValue | None = None,
  88. load_func: t.Callable[[str], _TAccessorValue] | None = None,
  89. dump_func: t.Callable[[_TAccessorValue], str] | None = None,
  90. read_only: bool | None = None,
  91. doc: str | None = None,
  92. ) -> None:
  93. self.name = name
  94. self.default = default
  95. self.load_func = load_func
  96. self.dump_func = dump_func
  97. if read_only is not None:
  98. self.read_only = read_only
  99. self.__doc__ = doc
  100. def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
  101. raise NotImplementedError
  102. @t.overload
  103. def __get__(
  104. self, instance: None, owner: type
  105. ) -> _DictAccessorProperty[_TAccessorValue]:
  106. ...
  107. @t.overload
  108. def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue:
  109. ...
  110. def __get__(
  111. self, instance: t.Any | None, owner: type
  112. ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:
  113. if instance is None:
  114. return self
  115. storage = self.lookup(instance)
  116. if self.name not in storage:
  117. return self.default # type: ignore
  118. value = storage[self.name]
  119. if self.load_func is not None:
  120. try:
  121. return self.load_func(value)
  122. except (ValueError, TypeError):
  123. return self.default # type: ignore
  124. return value # type: ignore
  125. def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
  126. if self.read_only:
  127. raise AttributeError("read only property")
  128. if self.dump_func is not None:
  129. self.lookup(instance)[self.name] = self.dump_func(value)
  130. else:
  131. self.lookup(instance)[self.name] = value
  132. def __delete__(self, instance: t.Any) -> None:
  133. if self.read_only:
  134. raise AttributeError("read only property")
  135. self.lookup(instance).pop(self.name, None)
  136. def __repr__(self) -> str:
  137. return f"<{type(self).__name__} {self.name}>"
  138. _plain_int_re = re.compile(r"-?\d+", re.ASCII)
  139. def _plain_int(value: str) -> int:
  140. """Parse an int only if it is only ASCII digits and ``-``.
  141. This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but
  142. are not allowed in HTTP header values.
  143. Any leading or trailing whitespace is stripped
  144. """
  145. value = value.strip()
  146. if _plain_int_re.fullmatch(value) is None:
  147. raise ValueError
  148. return int(value)