range.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from __future__ import annotations
  2. class IfRange:
  3. """Very simple object that represents the `If-Range` header in parsed
  4. form. It will either have neither a etag or date or one of either but
  5. never both.
  6. .. versionadded:: 0.7
  7. """
  8. def __init__(self, etag=None, date=None):
  9. #: The etag parsed and unquoted. Ranges always operate on strong
  10. #: etags so the weakness information is not necessary.
  11. self.etag = etag
  12. #: The date in parsed format or `None`.
  13. self.date = date
  14. def to_header(self):
  15. """Converts the object back into an HTTP header."""
  16. if self.date is not None:
  17. return http.http_date(self.date)
  18. if self.etag is not None:
  19. return http.quote_etag(self.etag)
  20. return ""
  21. def __str__(self):
  22. return self.to_header()
  23. def __repr__(self):
  24. return f"<{type(self).__name__} {str(self)!r}>"
  25. class Range:
  26. """Represents a ``Range`` header. All methods only support only
  27. bytes as the unit. Stores a list of ranges if given, but the methods
  28. only work if only one range is provided.
  29. :raise ValueError: If the ranges provided are invalid.
  30. .. versionchanged:: 0.15
  31. The ranges passed in are validated.
  32. .. versionadded:: 0.7
  33. """
  34. def __init__(self, units, ranges):
  35. #: The units of this range. Usually "bytes".
  36. self.units = units
  37. #: A list of ``(begin, end)`` tuples for the range header provided.
  38. #: The ranges are non-inclusive.
  39. self.ranges = ranges
  40. for start, end in ranges:
  41. if start is None or (end is not None and (start < 0 or start >= end)):
  42. raise ValueError(f"{(start, end)} is not a valid range.")
  43. def range_for_length(self, length):
  44. """If the range is for bytes, the length is not None and there is
  45. exactly one range and it is satisfiable it returns a ``(start, stop)``
  46. tuple, otherwise `None`.
  47. """
  48. if self.units != "bytes" or length is None or len(self.ranges) != 1:
  49. return None
  50. start, end = self.ranges[0]
  51. if end is None:
  52. end = length
  53. if start < 0:
  54. start += length
  55. if http.is_byte_range_valid(start, end, length):
  56. return start, min(end, length)
  57. return None
  58. def make_content_range(self, length):
  59. """Creates a :class:`~werkzeug.datastructures.ContentRange` object
  60. from the current range and given content length.
  61. """
  62. rng = self.range_for_length(length)
  63. if rng is not None:
  64. return ContentRange(self.units, rng[0], rng[1], length)
  65. return None
  66. def to_header(self):
  67. """Converts the object back into an HTTP header."""
  68. ranges = []
  69. for begin, end in self.ranges:
  70. if end is None:
  71. ranges.append(f"{begin}-" if begin >= 0 else str(begin))
  72. else:
  73. ranges.append(f"{begin}-{end - 1}")
  74. return f"{self.units}={','.join(ranges)}"
  75. def to_content_range_header(self, length):
  76. """Converts the object into `Content-Range` HTTP header,
  77. based on given length
  78. """
  79. range = self.range_for_length(length)
  80. if range is not None:
  81. return f"{self.units} {range[0]}-{range[1] - 1}/{length}"
  82. return None
  83. def __str__(self):
  84. return self.to_header()
  85. def __repr__(self):
  86. return f"<{type(self).__name__} {str(self)!r}>"
  87. def _callback_property(name):
  88. def fget(self):
  89. return getattr(self, name)
  90. def fset(self, value):
  91. setattr(self, name, value)
  92. if self.on_update is not None:
  93. self.on_update(self)
  94. return property(fget, fset)
  95. class ContentRange:
  96. """Represents the content range header.
  97. .. versionadded:: 0.7
  98. """
  99. def __init__(self, units, start, stop, length=None, on_update=None):
  100. assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
  101. self.on_update = on_update
  102. self.set(start, stop, length, units)
  103. #: The units to use, usually "bytes"
  104. units = _callback_property("_units")
  105. #: The start point of the range or `None`.
  106. start = _callback_property("_start")
  107. #: The stop point of the range (non-inclusive) or `None`. Can only be
  108. #: `None` if also start is `None`.
  109. stop = _callback_property("_stop")
  110. #: The length of the range or `None`.
  111. length = _callback_property("_length")
  112. def set(self, start, stop, length=None, units="bytes"):
  113. """Simple method to update the ranges."""
  114. assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
  115. self._units = units
  116. self._start = start
  117. self._stop = stop
  118. self._length = length
  119. if self.on_update is not None:
  120. self.on_update(self)
  121. def unset(self):
  122. """Sets the units to `None` which indicates that the header should
  123. no longer be used.
  124. """
  125. self.set(None, None, units=None)
  126. def to_header(self):
  127. if self.units is None:
  128. return ""
  129. if self.length is None:
  130. length = "*"
  131. else:
  132. length = self.length
  133. if self.start is None:
  134. return f"{self.units} */{length}"
  135. return f"{self.units} {self.start}-{self.stop - 1}/{length}"
  136. def __bool__(self):
  137. return self.units is not None
  138. def __str__(self):
  139. return self.to_header()
  140. def __repr__(self):
  141. return f"<{type(self).__name__} {str(self)!r}>"
  142. # circular dependencies
  143. from .. import http