file_storage.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. from __future__ import annotations
  2. import mimetypes
  3. from io import BytesIO
  4. from os import fsdecode
  5. from os import fspath
  6. from .._internal import _plain_int
  7. from .structures import MultiDict
  8. class FileStorage:
  9. """The :class:`FileStorage` class is a thin wrapper over incoming files.
  10. It is used by the request object to represent uploaded files. All the
  11. attributes of the wrapper stream are proxied by the file storage so
  12. it's possible to do ``storage.read()`` instead of the long form
  13. ``storage.stream.read()``.
  14. """
  15. def __init__(
  16. self,
  17. stream=None,
  18. filename=None,
  19. name=None,
  20. content_type=None,
  21. content_length=None,
  22. headers=None,
  23. ):
  24. self.name = name
  25. self.stream = stream or BytesIO()
  26. # If no filename is provided, attempt to get the filename from
  27. # the stream object. Python names special streams like
  28. # ``<stderr>`` with angular brackets, skip these streams.
  29. if filename is None:
  30. filename = getattr(stream, "name", None)
  31. if filename is not None:
  32. filename = fsdecode(filename)
  33. if filename and filename[0] == "<" and filename[-1] == ">":
  34. filename = None
  35. else:
  36. filename = fsdecode(filename)
  37. self.filename = filename
  38. if headers is None:
  39. from .headers import Headers
  40. headers = Headers()
  41. self.headers = headers
  42. if content_type is not None:
  43. headers["Content-Type"] = content_type
  44. if content_length is not None:
  45. headers["Content-Length"] = str(content_length)
  46. def _parse_content_type(self):
  47. if not hasattr(self, "_parsed_content_type"):
  48. self._parsed_content_type = http.parse_options_header(self.content_type)
  49. @property
  50. def content_type(self):
  51. """The content-type sent in the header. Usually not available"""
  52. return self.headers.get("content-type")
  53. @property
  54. def content_length(self):
  55. """The content-length sent in the header. Usually not available"""
  56. if "content-length" in self.headers:
  57. try:
  58. return _plain_int(self.headers["content-length"])
  59. except ValueError:
  60. pass
  61. return 0
  62. @property
  63. def mimetype(self):
  64. """Like :attr:`content_type`, but without parameters (eg, without
  65. charset, type etc.) and always lowercase. For example if the content
  66. type is ``text/HTML; charset=utf-8`` the mimetype would be
  67. ``'text/html'``.
  68. .. versionadded:: 0.7
  69. """
  70. self._parse_content_type()
  71. return self._parsed_content_type[0].lower()
  72. @property
  73. def mimetype_params(self):
  74. """The mimetype parameters as dict. For example if the content
  75. type is ``text/html; charset=utf-8`` the params would be
  76. ``{'charset': 'utf-8'}``.
  77. .. versionadded:: 0.7
  78. """
  79. self._parse_content_type()
  80. return self._parsed_content_type[1]
  81. def save(self, dst, buffer_size=16384):
  82. """Save the file to a destination path or file object. If the
  83. destination is a file object you have to close it yourself after the
  84. call. The buffer size is the number of bytes held in memory during
  85. the copy process. It defaults to 16KB.
  86. For secure file saving also have a look at :func:`secure_filename`.
  87. :param dst: a filename, :class:`os.PathLike`, or open file
  88. object to write to.
  89. :param buffer_size: Passed as the ``length`` parameter of
  90. :func:`shutil.copyfileobj`.
  91. .. versionchanged:: 1.0
  92. Supports :mod:`pathlib`.
  93. """
  94. from shutil import copyfileobj
  95. close_dst = False
  96. if hasattr(dst, "__fspath__"):
  97. dst = fspath(dst)
  98. if isinstance(dst, str):
  99. dst = open(dst, "wb")
  100. close_dst = True
  101. try:
  102. copyfileobj(self.stream, dst, buffer_size)
  103. finally:
  104. if close_dst:
  105. dst.close()
  106. def close(self):
  107. """Close the underlying file if possible."""
  108. try:
  109. self.stream.close()
  110. except Exception:
  111. pass
  112. def __bool__(self):
  113. return bool(self.filename)
  114. def __getattr__(self, name):
  115. try:
  116. return getattr(self.stream, name)
  117. except AttributeError:
  118. # SpooledTemporaryFile doesn't implement IOBase, get the
  119. # attribute from its backing file instead.
  120. # https://github.com/python/cpython/pull/3249
  121. if hasattr(self.stream, "_file"):
  122. return getattr(self.stream._file, name)
  123. raise
  124. def __iter__(self):
  125. return iter(self.stream)
  126. def __repr__(self):
  127. return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
  128. class FileMultiDict(MultiDict):
  129. """A special :class:`MultiDict` that has convenience methods to add
  130. files to it. This is used for :class:`EnvironBuilder` and generally
  131. useful for unittesting.
  132. .. versionadded:: 0.5
  133. """
  134. def add_file(self, name, file, filename=None, content_type=None):
  135. """Adds a new file to the dict. `file` can be a file name or
  136. a :class:`file`-like or a :class:`FileStorage` object.
  137. :param name: the name of the field.
  138. :param file: a filename or :class:`file`-like object
  139. :param filename: an optional filename
  140. :param content_type: an optional content type
  141. """
  142. if isinstance(file, FileStorage):
  143. value = file
  144. else:
  145. if isinstance(file, str):
  146. if filename is None:
  147. filename = file
  148. file = open(file, "rb")
  149. if filename and content_type is None:
  150. content_type = (
  151. mimetypes.guess_type(filename)[0] or "application/octet-stream"
  152. )
  153. value = FileStorage(file, filename, name, content_type)
  154. self.add(name, value)
  155. # circular dependencies
  156. from .. import http