123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- from __future__ import annotations
- import re
- import typing as t
- from dataclasses import dataclass
- from enum import auto
- from enum import Enum
- from ..datastructures import Headers
- from ..exceptions import RequestEntityTooLarge
- from ..http import parse_options_header
- class Event:
- pass
- @dataclass(frozen=True)
- class Preamble(Event):
- data: bytes
- @dataclass(frozen=True)
- class Field(Event):
- name: str
- headers: Headers
- @dataclass(frozen=True)
- class File(Event):
- name: str
- filename: str
- headers: Headers
- @dataclass(frozen=True)
- class Data(Event):
- data: bytes
- more_data: bool
- @dataclass(frozen=True)
- class Epilogue(Event):
- data: bytes
- class NeedData(Event):
- pass
- NEED_DATA = NeedData()
- class State(Enum):
- PREAMBLE = auto()
- PART = auto()
- DATA = auto()
- DATA_START = auto()
- EPILOGUE = auto()
- COMPLETE = auto()
- # Multipart line breaks MUST be CRLF (\r\n) by RFC-7578, except that
- # many implementations break this and either use CR or LF alone.
- LINE_BREAK = b"(?:\r\n|\n|\r)"
- BLANK_LINE_RE = re.compile(b"(?:\r\n\r\n|\r\r|\n\n)", re.MULTILINE)
- LINE_BREAK_RE = re.compile(LINE_BREAK, re.MULTILINE)
- # Header values can be continued via a space or tab after the linebreak, as
- # per RFC2231
- HEADER_CONTINUATION_RE = re.compile(b"%s[ \t]" % LINE_BREAK, re.MULTILINE)
- # This must be long enough to contain any line breaks plus any
- # additional boundary markers (--) such that they will be found in a
- # subsequent search
- SEARCH_EXTRA_LENGTH = 8
- class MultipartDecoder:
- """Decodes a multipart message as bytes into Python events.
- The part data is returned as available to allow the caller to save
- the data from memory to disk, if desired.
- """
- def __init__(
- self,
- boundary: bytes,
- max_form_memory_size: int | None = None,
- *,
- max_parts: int | None = None,
- ) -> None:
- self.buffer = bytearray()
- self.complete = False
- self.max_form_memory_size = max_form_memory_size
- self.max_parts = max_parts
- self.state = State.PREAMBLE
- self.boundary = boundary
- # Note in the below \h i.e. horizontal whitespace is used
- # as [^\S\n\r] as \h isn't supported in python.
- # The preamble must end with a boundary where the boundary is
- # prefixed by a line break, RFC2046. Except that many
- # implementations including Werkzeug's tests omit the line
- # break prefix. In addition the first boundary could be the
- # epilogue boundary (for empty form-data) hence the matching
- # group to understand if it is an epilogue boundary.
- self.preamble_re = re.compile(
- rb"%s?--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
- % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
- re.MULTILINE,
- )
- # A boundary must include a line break prefix and suffix, and
- # may include trailing whitespace. In addition the boundary
- # could be the epilogue boundary hence the matching group to
- # understand if it is an epilogue boundary.
- self.boundary_re = re.compile(
- rb"%s--%s(--[^\S\n\r]*%s?|[^\S\n\r]*%s)"
- % (LINE_BREAK, re.escape(boundary), LINE_BREAK, LINE_BREAK),
- re.MULTILINE,
- )
- self._search_position = 0
- self._parts_decoded = 0
- def last_newline(self, data: bytes) -> int:
- try:
- last_nl = data.rindex(b"\n")
- except ValueError:
- last_nl = len(data)
- try:
- last_cr = data.rindex(b"\r")
- except ValueError:
- last_cr = len(data)
- return min(last_nl, last_cr)
- def receive_data(self, data: bytes | None) -> None:
- if data is None:
- self.complete = True
- elif (
- self.max_form_memory_size is not None
- and len(self.buffer) + len(data) > self.max_form_memory_size
- ):
- raise RequestEntityTooLarge()
- else:
- self.buffer.extend(data)
- def next_event(self) -> Event:
- event: Event = NEED_DATA
- if self.state == State.PREAMBLE:
- match = self.preamble_re.search(self.buffer, self._search_position)
- if match is not None:
- if match.group(1).startswith(b"--"):
- self.state = State.EPILOGUE
- else:
- self.state = State.PART
- data = bytes(self.buffer[: match.start()])
- del self.buffer[: match.end()]
- event = Preamble(data=data)
- self._search_position = 0
- else:
- # Update the search start position to be equal to the
- # current buffer length (already searched) minus a
- # safe buffer for part of the search target.
- self._search_position = max(
- 0, len(self.buffer) - len(self.boundary) - SEARCH_EXTRA_LENGTH
- )
- elif self.state == State.PART:
- match = BLANK_LINE_RE.search(self.buffer, self._search_position)
- if match is not None:
- headers = self._parse_headers(self.buffer[: match.start()])
- # The final header ends with a single CRLF, however a
- # blank line indicates the start of the
- # body. Therefore the end is after the first CRLF.
- headers_end = (match.start() + match.end()) // 2
- del self.buffer[:headers_end]
- if "content-disposition" not in headers:
- raise ValueError("Missing Content-Disposition header")
- disposition, extra = parse_options_header(
- headers["content-disposition"]
- )
- name = t.cast(str, extra.get("name"))
- filename = extra.get("filename")
- if filename is not None:
- event = File(
- filename=filename,
- headers=headers,
- name=name,
- )
- else:
- event = Field(
- headers=headers,
- name=name,
- )
- self.state = State.DATA_START
- self._search_position = 0
- self._parts_decoded += 1
- if self.max_parts is not None and self._parts_decoded > self.max_parts:
- raise RequestEntityTooLarge()
- else:
- # Update the search start position to be equal to the
- # current buffer length (already searched) minus a
- # safe buffer for part of the search target.
- self._search_position = max(0, len(self.buffer) - SEARCH_EXTRA_LENGTH)
- elif self.state == State.DATA_START:
- data, del_index, more_data = self._parse_data(self.buffer, start=True)
- del self.buffer[:del_index]
- event = Data(data=data, more_data=more_data)
- if more_data:
- self.state = State.DATA
- elif self.state == State.DATA:
- data, del_index, more_data = self._parse_data(self.buffer, start=False)
- del self.buffer[:del_index]
- if data or not more_data:
- event = Data(data=data, more_data=more_data)
- elif self.state == State.EPILOGUE and self.complete:
- event = Epilogue(data=bytes(self.buffer))
- del self.buffer[:]
- self.state = State.COMPLETE
- if self.complete and isinstance(event, NeedData):
- raise ValueError(f"Invalid form-data cannot parse beyond {self.state}")
- return event
- def _parse_headers(self, data: bytes) -> Headers:
- headers: list[tuple[str, str]] = []
- # Merge the continued headers into one line
- data = HEADER_CONTINUATION_RE.sub(b" ", data)
- # Now there is one header per line
- for line in data.splitlines():
- line = line.strip()
- if line != b"":
- name, _, value = line.decode().partition(":")
- headers.append((name.strip(), value.strip()))
- return Headers(headers)
- def _parse_data(self, data: bytes, *, start: bool) -> tuple[bytes, int, bool]:
- # Body parts must start with CRLF (or CR or LF)
- if start:
- match = LINE_BREAK_RE.match(data)
- data_start = t.cast(t.Match[bytes], match).end()
- else:
- data_start = 0
- boundary = b"--" + self.boundary
- if self.buffer.find(boundary) == -1:
- # No complete boundary in the buffer, but there may be
- # a partial boundary at the end. As the boundary
- # starts with either a nl or cr find the earliest and
- # return up to that as data.
- data_end = del_index = self.last_newline(data[data_start:]) + data_start
- # If amount of data after last newline is far from
- # possible length of partial boundary, we should
- # assume that there is no partial boundary in the buffer
- # and return all pending data.
- if (len(data) - data_end) > len(b"\n" + boundary):
- data_end = del_index = len(data)
- more_data = True
- else:
- match = self.boundary_re.search(data)
- if match is not None:
- if match.group(1).startswith(b"--"):
- self.state = State.EPILOGUE
- else:
- self.state = State.PART
- data_end = match.start()
- del_index = match.end()
- else:
- data_end = del_index = self.last_newline(data[data_start:]) + data_start
- more_data = match is None
- return bytes(data[data_start:data_end]), del_index, more_data
- class MultipartEncoder:
- def __init__(self, boundary: bytes) -> None:
- self.boundary = boundary
- self.state = State.PREAMBLE
- def send_event(self, event: Event) -> bytes:
- if isinstance(event, Preamble) and self.state == State.PREAMBLE:
- self.state = State.PART
- return event.data
- elif isinstance(event, (Field, File)) and self.state in {
- State.PREAMBLE,
- State.PART,
- State.DATA,
- }:
- data = b"\r\n--" + self.boundary + b"\r\n"
- data += b'Content-Disposition: form-data; name="%s"' % event.name.encode()
- if isinstance(event, File):
- data += b'; filename="%s"' % event.filename.encode()
- data += b"\r\n"
- for name, value in t.cast(Field, event).headers:
- if name.lower() != "content-disposition":
- data += f"{name}: {value}\r\n".encode()
- self.state = State.DATA_START
- return data
- elif isinstance(event, Data) and self.state == State.DATA_START:
- self.state = State.DATA
- if len(event.data) > 0:
- return b"\r\n" + event.data
- else:
- return event.data
- elif isinstance(event, Data) and self.state == State.DATA:
- return event.data
- elif isinstance(event, Epilogue):
- self.state = State.COMPLETE
- return b"\r\n--" + self.boundary + b"--\r\n" + event.data
- else:
- raise ValueError(f"Cannot generate {event} in state: {self.state}")
|