12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109 |
- """A WSGI and HTTP server for use **during development only**. This
- server is convenient to use, but is not designed to be particularly
- stable, secure, or efficient. Use a dedicate WSGI server and HTTP
- server when deploying to production.
- It provides features like interactive debugging and code reloading. Use
- ``run_simple`` to start the server. Put this in a ``run.py`` script:
- .. code-block:: python
- from myapp import create_app
- from werkzeug import run_simple
- """
- from __future__ import annotations
- import errno
- import io
- import os
- import selectors
- import socket
- import socketserver
- import sys
- import typing as t
- from datetime import datetime as dt
- from datetime import timedelta
- from datetime import timezone
- from http.server import BaseHTTPRequestHandler
- from http.server import HTTPServer
- from urllib.parse import unquote
- from urllib.parse import urlsplit
- from ._internal import _log
- from ._internal import _wsgi_encoding_dance
- from .exceptions import InternalServerError
- from .urls import uri_to_iri
- try:
- import ssl
- except ImportError:
- class _SslDummy:
- def __getattr__(self, name: str) -> t.Any:
- raise RuntimeError( # noqa: B904
- "SSL is unavailable because this Python runtime was not"
- " compiled with SSL/TLS support."
- )
- ssl = _SslDummy() # type: ignore
- _log_add_style = True
- if os.name == "nt":
- try:
- __import__("colorama")
- except ImportError:
- _log_add_style = False
- can_fork = hasattr(os, "fork")
- if can_fork:
- ForkingMixIn = socketserver.ForkingMixIn
- else:
- class ForkingMixIn: # type: ignore
- pass
- try:
- af_unix = socket.AF_UNIX
- except AttributeError:
- af_unix = None # type: ignore
- LISTEN_QUEUE = 128
- _TSSLContextArg = t.Optional[
- t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], t.Literal["adhoc"]]
- ]
- if t.TYPE_CHECKING:
- from _typeshed.wsgi import WSGIApplication
- from _typeshed.wsgi import WSGIEnvironment
- from cryptography.hazmat.primitives.asymmetric.rsa import (
- RSAPrivateKeyWithSerialization,
- )
- from cryptography.x509 import Certificate
- class DechunkedInput(io.RawIOBase):
- """An input stream that handles Transfer-Encoding 'chunked'"""
- def __init__(self, rfile: t.IO[bytes]) -> None:
- self._rfile = rfile
- self._done = False
- self._len = 0
- def readable(self) -> bool:
- return True
- def read_chunk_len(self) -> int:
- try:
- line = self._rfile.readline().decode("latin1")
- _len = int(line.strip(), 16)
- except ValueError as e:
- raise OSError("Invalid chunk header") from e
- if _len < 0:
- raise OSError("Negative chunk length not allowed")
- return _len
- def readinto(self, buf: bytearray) -> int: # type: ignore
- read = 0
- while not self._done and read < len(buf):
- if self._len == 0:
- # This is the first chunk or we fully consumed the previous
- # one. Read the next length of the next chunk
- self._len = self.read_chunk_len()
- if self._len == 0:
- # Found the final chunk of size 0. The stream is now exhausted,
- # but there is still a final newline that should be consumed
- self._done = True
- if self._len > 0:
- # There is data (left) in this chunk, so append it to the
- # buffer. If this operation fully consumes the chunk, this will
- # reset self._len to 0.
- n = min(len(buf), self._len)
- # If (read + chunk size) becomes more than len(buf), buf will
- # grow beyond the original size and read more data than
- # required. So only read as much data as can fit in buf.
- if read + n > len(buf):
- buf[read:] = self._rfile.read(len(buf) - read)
- self._len -= len(buf) - read
- read = len(buf)
- else:
- buf[read : read + n] = self._rfile.read(n)
- self._len -= n
- read += n
- if self._len == 0:
- # Skip the terminating newline of a chunk that has been fully
- # consumed. This also applies to the 0-sized final chunk
- terminator = self._rfile.readline()
- if terminator not in (b"\n", b"\r\n", b"\r"):
- raise OSError("Missing chunk terminating newline")
- return read
- class WSGIRequestHandler(BaseHTTPRequestHandler):
- """A request handler that implements WSGI dispatching."""
- server: BaseWSGIServer
- @property
- def server_version(self) -> str: # type: ignore
- return self.server._server_version
- def make_environ(self) -> WSGIEnvironment:
- request_url = urlsplit(self.path)
- url_scheme = "http" if self.server.ssl_context is None else "https"
- if not self.client_address:
- self.client_address = ("<local>", 0)
- elif isinstance(self.client_address, str):
- self.client_address = (self.client_address, 0)
- # If there was no scheme but the path started with two slashes,
- # the first segment may have been incorrectly parsed as the
- # netloc, prepend it to the path again.
- if not request_url.scheme and request_url.netloc:
- path_info = f"/{request_url.netloc}{request_url.path}"
- else:
- path_info = request_url.path
- path_info = unquote(path_info)
- environ: WSGIEnvironment = {
- "wsgi.version": (1, 0),
- "wsgi.url_scheme": url_scheme,
- "wsgi.input": self.rfile,
- "wsgi.errors": sys.stderr,
- "wsgi.multithread": self.server.multithread,
- "wsgi.multiprocess": self.server.multiprocess,
- "wsgi.run_once": False,
- "werkzeug.socket": self.connection,
- "SERVER_SOFTWARE": self.server_version,
- "REQUEST_METHOD": self.command,
- "SCRIPT_NAME": "",
- "PATH_INFO": _wsgi_encoding_dance(path_info),
- "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
- # Non-standard, added by mod_wsgi, uWSGI
- "REQUEST_URI": _wsgi_encoding_dance(self.path),
- # Non-standard, added by gunicorn
- "RAW_URI": _wsgi_encoding_dance(self.path),
- "REMOTE_ADDR": self.address_string(),
- "REMOTE_PORT": self.port_integer(),
- "SERVER_NAME": self.server.server_address[0],
- "SERVER_PORT": str(self.server.server_address[1]),
- "SERVER_PROTOCOL": self.request_version,
- }
- for key, value in self.headers.items():
- if "_" in key:
- continue
- key = key.upper().replace("-", "_")
- value = value.replace("\r\n", "")
- if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
- key = f"HTTP_{key}"
- if key in environ:
- value = f"{environ[key]},{value}"
- environ[key] = value
- if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
- environ["wsgi.input_terminated"] = True
- environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
- # Per RFC 2616, if the URL is absolute, use that as the host.
- # We're using "has a scheme" to indicate an absolute URL.
- if request_url.scheme and request_url.netloc:
- environ["HTTP_HOST"] = request_url.netloc
- try:
- # binary_form=False gives nicer information, but wouldn't be compatible with
- # what Nginx or Apache could return.
- peer_cert = self.connection.getpeercert(binary_form=True)
- if peer_cert is not None:
- # Nginx and Apache use PEM format.
- environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
- except ValueError:
- # SSL handshake hasn't finished.
- self.server.log("error", "Cannot fetch SSL peer certificate info")
- except AttributeError:
- # Not using TLS, the socket will not have getpeercert().
- pass
- return environ
- def run_wsgi(self) -> None:
- if self.headers.get("Expect", "").lower().strip() == "100-continue":
- self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
- self.environ = environ = self.make_environ()
- status_set: str | None = None
- headers_set: list[tuple[str, str]] | None = None
- status_sent: str | None = None
- headers_sent: list[tuple[str, str]] | None = None
- chunk_response: bool = False
- def write(data: bytes) -> None:
- nonlocal status_sent, headers_sent, chunk_response
- assert status_set is not None, "write() before start_response"
- assert headers_set is not None, "write() before start_response"
- if status_sent is None:
- status_sent = status_set
- headers_sent = headers_set
- try:
- code_str, msg = status_sent.split(None, 1)
- except ValueError:
- code_str, msg = status_sent, ""
- code = int(code_str)
- self.send_response(code, msg)
- header_keys = set()
- for key, value in headers_sent:
- self.send_header(key, value)
- header_keys.add(key.lower())
- # Use chunked transfer encoding if there is no content
- # length. Do not use for 1xx and 204 responses. 304
- # responses and HEAD requests are also excluded, which
- # is the more conservative behavior and matches other
- # parts of the code.
- # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
- if (
- not (
- "content-length" in header_keys
- or environ["REQUEST_METHOD"] == "HEAD"
- or (100 <= code < 200)
- or code in {204, 304}
- )
- and self.protocol_version >= "HTTP/1.1"
- ):
- chunk_response = True
- self.send_header("Transfer-Encoding", "chunked")
- # Always close the connection. This disables HTTP/1.1
- # keep-alive connections. They aren't handled well by
- # Python's http.server because it doesn't know how to
- # drain the stream before the next request line.
- self.send_header("Connection", "close")
- self.end_headers()
- assert isinstance(data, bytes), "applications must write bytes"
- if data:
- if chunk_response:
- self.wfile.write(hex(len(data))[2:].encode())
- self.wfile.write(b"\r\n")
- self.wfile.write(data)
- if chunk_response:
- self.wfile.write(b"\r\n")
- self.wfile.flush()
- def start_response(status, headers, exc_info=None): # type: ignore
- nonlocal status_set, headers_set
- if exc_info:
- try:
- if headers_sent:
- raise exc_info[1].with_traceback(exc_info[2])
- finally:
- exc_info = None
- elif headers_set:
- raise AssertionError("Headers already set")
- status_set = status
- headers_set = headers
- return write
- def execute(app: WSGIApplication) -> None:
- application_iter = app(environ, start_response)
- try:
- for data in application_iter:
- write(data)
- if not headers_sent:
- write(b"")
- if chunk_response:
- self.wfile.write(b"0\r\n\r\n")
- finally:
- # Check for any remaining data in the read socket, and discard it. This
- # will read past request.max_content_length, but lets the client see a
- # 413 response instead of a connection reset failure. If we supported
- # keep-alive connections, this naive approach would break by reading the
- # next request line. Since we know that write (above) closes every
- # connection we can read everything.
- selector = selectors.DefaultSelector()
- selector.register(self.connection, selectors.EVENT_READ)
- total_size = 0
- total_reads = 0
- # A timeout of 0 tends to fail because a client needs a small amount of
- # time to continue sending its data.
- while selector.select(timeout=0.01):
- # Only read 10MB into memory at a time.
- data = self.rfile.read(10_000_000)
- total_size += len(data)
- total_reads += 1
- # Stop reading on no data, >=10GB, or 1000 reads. If a client sends
- # more than that, they'll get a connection reset failure.
- if not data or total_size >= 10_000_000_000 or total_reads > 1000:
- break
- selector.close()
- if hasattr(application_iter, "close"):
- application_iter.close()
- try:
- execute(self.server.app)
- except (ConnectionError, socket.timeout) as e:
- self.connection_dropped(e, environ)
- except Exception as e:
- if self.server.passthrough_errors:
- raise
- if status_sent is not None and chunk_response:
- self.close_connection = True
- try:
- # if we haven't yet sent the headers but they are set
- # we roll back to be able to set them again.
- if status_sent is None:
- status_set = None
- headers_set = None
- execute(InternalServerError())
- except Exception:
- pass
- from .debug.tbtools import DebugTraceback
- msg = DebugTraceback(e).render_traceback_text()
- self.server.log("error", f"Error on request:\n{msg}")
- def handle(self) -> None:
- """Handles a request ignoring dropped connections."""
- try:
- super().handle()
- except (ConnectionError, socket.timeout) as e:
- self.connection_dropped(e)
- except Exception as e:
- if self.server.ssl_context is not None and is_ssl_error(e):
- self.log_error("SSL error occurred: %s", e)
- else:
- raise
- def connection_dropped(
- self, error: BaseException, environ: WSGIEnvironment | None = None
- ) -> None:
- """Called if the connection was closed by the client. By default
- nothing happens.
- """
- def __getattr__(self, name: str) -> t.Any:
- # All HTTP methods are handled by run_wsgi.
- if name.startswith("do_"):
- return self.run_wsgi
- # All other attributes are forwarded to the base class.
- return getattr(super(), name)
- def address_string(self) -> str:
- if getattr(self, "environ", None):
- return self.environ["REMOTE_ADDR"] # type: ignore
- if not self.client_address:
- return "<local>"
- return self.client_address[0]
- def port_integer(self) -> int:
- return self.client_address[1]
- # Escape control characters. This is defined (but private) in Python 3.12.
- _control_char_table = str.maketrans(
- {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]}
- )
- _control_char_table[ord("\\")] = r"\\"
- def log_request(self, code: int | str = "-", size: int | str = "-") -> None:
- try:
- path = uri_to_iri(self.path)
- msg = f"{self.command} {path} {self.request_version}"
- except AttributeError:
- # path isn't set if the requestline was bad
- msg = self.requestline
- # Escape control characters that may be in the decoded path.
- msg = msg.translate(self._control_char_table)
- code = str(code)
- if code[0] == "1": # 1xx - Informational
- msg = _ansi_style(msg, "bold")
- elif code == "200": # 2xx - Success
- pass
- elif code == "304": # 304 - Resource Not Modified
- msg = _ansi_style(msg, "cyan")
- elif code[0] == "3": # 3xx - Redirection
- msg = _ansi_style(msg, "green")
- elif code == "404": # 404 - Resource Not Found
- msg = _ansi_style(msg, "yellow")
- elif code[0] == "4": # 4xx - Client Error
- msg = _ansi_style(msg, "bold", "red")
- else: # 5xx, or any other response
- msg = _ansi_style(msg, "bold", "magenta")
- self.log("info", '"%s" %s %s', msg, code, size)
- def log_error(self, format: str, *args: t.Any) -> None:
- self.log("error", format, *args)
- def log_message(self, format: str, *args: t.Any) -> None:
- self.log("info", format, *args)
- def log(self, type: str, message: str, *args: t.Any) -> None:
- _log(
- type,
- f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n",
- *args,
- )
- def _ansi_style(value: str, *styles: str) -> str:
- if not _log_add_style:
- return value
- codes = {
- "bold": 1,
- "red": 31,
- "green": 32,
- "yellow": 33,
- "magenta": 35,
- "cyan": 36,
- }
- for style in styles:
- value = f"\x1b[{codes[style]}m{value}"
- return f"{value}\x1b[0m"
- def generate_adhoc_ssl_pair(
- cn: str | None = None,
- ) -> tuple[Certificate, RSAPrivateKeyWithSerialization]:
- try:
- from cryptography import x509
- from cryptography.x509.oid import NameOID
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.primitives.asymmetric import rsa
- except ImportError:
- raise TypeError(
- "Using ad-hoc certificates requires the cryptography library."
- ) from None
- backend = default_backend()
- pkey = rsa.generate_private_key(
- public_exponent=65537, key_size=2048, backend=backend
- )
- # pretty damn sure that this is not actually accepted by anyone
- if cn is None:
- cn = "*"
- subject = x509.Name(
- [
- x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
- x509.NameAttribute(NameOID.COMMON_NAME, cn),
- ]
- )
- backend = default_backend()
- cert = (
- x509.CertificateBuilder()
- .subject_name(subject)
- .issuer_name(subject)
- .public_key(pkey.public_key())
- .serial_number(x509.random_serial_number())
- .not_valid_before(dt.now(timezone.utc))
- .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
- .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
- .add_extension(x509.SubjectAlternativeName([x509.DNSName(cn)]), critical=False)
- .sign(pkey, hashes.SHA256(), backend)
- )
- return cert, pkey
- def make_ssl_devcert(
- base_path: str, host: str | None = None, cn: str | None = None
- ) -> tuple[str, str]:
- """Creates an SSL key for development. This should be used instead of
- the ``'adhoc'`` key which generates a new cert on each server start.
- It accepts a path for where it should store the key and cert and
- either a host or CN. If a host is given it will use the CN
- ``*.host/CN=host``.
- For more information see :func:`run_simple`.
- .. versionadded:: 0.9
- :param base_path: the path to the certificate and key. The extension
- ``.crt`` is added for the certificate, ``.key`` is
- added for the key.
- :param host: the name of the host. This can be used as an alternative
- for the `cn`.
- :param cn: the `CN` to use.
- """
- if host is not None:
- cn = f"*.{host}/CN={host}"
- cert, pkey = generate_adhoc_ssl_pair(cn=cn)
- from cryptography.hazmat.primitives import serialization
- cert_file = f"{base_path}.crt"
- pkey_file = f"{base_path}.key"
- with open(cert_file, "wb") as f:
- f.write(cert.public_bytes(serialization.Encoding.PEM))
- with open(pkey_file, "wb") as f:
- f.write(
- pkey.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- )
- )
- return cert_file, pkey_file
- def generate_adhoc_ssl_context() -> ssl.SSLContext:
- """Generates an adhoc SSL context for the development server."""
- import tempfile
- import atexit
- cert, pkey = generate_adhoc_ssl_pair()
- from cryptography.hazmat.primitives import serialization
- cert_handle, cert_file = tempfile.mkstemp()
- pkey_handle, pkey_file = tempfile.mkstemp()
- atexit.register(os.remove, pkey_file)
- atexit.register(os.remove, cert_file)
- os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
- os.write(
- pkey_handle,
- pkey.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- ),
- )
- os.close(cert_handle)
- os.close(pkey_handle)
- ctx = load_ssl_context(cert_file, pkey_file)
- return ctx
- def load_ssl_context(
- cert_file: str, pkey_file: str | None = None, protocol: int | None = None
- ) -> ssl.SSLContext:
- """Loads SSL context from cert/private key files and optional protocol.
- Many parameters are directly taken from the API of
- :py:class:`ssl.SSLContext`.
- :param cert_file: Path of the certificate to use.
- :param pkey_file: Path of the private key to use. If not given, the key
- will be obtained from the certificate file.
- :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
- Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
- """
- if protocol is None:
- protocol = ssl.PROTOCOL_TLS_SERVER
- ctx = ssl.SSLContext(protocol)
- ctx.load_cert_chain(cert_file, pkey_file)
- return ctx
- def is_ssl_error(error: Exception | None = None) -> bool:
- """Checks if the given error (or the current one) is an SSL error."""
- if error is None:
- error = t.cast(Exception, sys.exc_info()[1])
- return isinstance(error, ssl.SSLError)
- def select_address_family(host: str, port: int) -> socket.AddressFamily:
- """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
- the host and port."""
- if host.startswith("unix://"):
- return socket.AF_UNIX
- elif ":" in host and hasattr(socket, "AF_INET6"):
- return socket.AF_INET6
- return socket.AF_INET
- def get_sockaddr(
- host: str, port: int, family: socket.AddressFamily
- ) -> tuple[str, int] | str:
- """Return a fully qualified socket address that can be passed to
- :func:`socket.bind`."""
- if family == af_unix:
- # Absolute path avoids IDNA encoding error when path starts with dot.
- return os.path.abspath(host.partition("://")[2])
- try:
- res = socket.getaddrinfo(
- host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
- )
- except socket.gaierror:
- return host, port
- return res[0][4] # type: ignore
- def get_interface_ip(family: socket.AddressFamily) -> str:
- """Get the IP address of an external interface. Used when binding to
- 0.0.0.0 or ::1 to show a more useful URL.
- :meta private:
- """
- # arbitrary private address
- host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
- with socket.socket(family, socket.SOCK_DGRAM) as s:
- try:
- s.connect((host, 58162))
- except OSError:
- return "::1" if family == socket.AF_INET6 else "127.0.0.1"
- return s.getsockname()[0] # type: ignore
- class BaseWSGIServer(HTTPServer):
- """A WSGI server that that handles one request at a time.
- Use :func:`make_server` to create a server instance.
- """
- multithread = False
- multiprocess = False
- request_queue_size = LISTEN_QUEUE
- allow_reuse_address = True
- def __init__(
- self,
- host: str,
- port: int,
- app: WSGIApplication,
- handler: type[WSGIRequestHandler] | None = None,
- passthrough_errors: bool = False,
- ssl_context: _TSSLContextArg | None = None,
- fd: int | None = None,
- ) -> None:
- if handler is None:
- handler = WSGIRequestHandler
- # If the handler doesn't directly set a protocol version and
- # thread or process workers are used, then allow chunked
- # responses and keep-alive connections by enabling HTTP/1.1.
- if "protocol_version" not in vars(handler) and (
- self.multithread or self.multiprocess
- ):
- handler.protocol_version = "HTTP/1.1"
- self.host = host
- self.port = port
- self.app = app
- self.passthrough_errors = passthrough_errors
- self.address_family = address_family = select_address_family(host, port)
- server_address = get_sockaddr(host, int(port), address_family)
- # Remove a leftover Unix socket file from a previous run. Don't
- # remove a file that was set up by run_simple.
- if address_family == af_unix and fd is None:
- server_address = t.cast(str, server_address)
- if os.path.exists(server_address):
- os.unlink(server_address)
- # Bind and activate will be handled manually, it should only
- # happen if we're not using a socket that was already set up.
- super().__init__(
- server_address, # type: ignore[arg-type]
- handler,
- bind_and_activate=False,
- )
- if fd is None:
- # No existing socket descriptor, do bind_and_activate=True.
- try:
- self.server_bind()
- self.server_activate()
- except OSError as e:
- # Catch connection issues and show them without the traceback. Show
- # extra instructions for address not found, and for macOS.
- self.server_close()
- print(e.strerror, file=sys.stderr)
- if e.errno == errno.EADDRINUSE:
- print(
- f"Port {port} is in use by another program. Either identify and"
- " stop that program, or start the server with a different"
- " port.",
- file=sys.stderr,
- )
- if sys.platform == "darwin" and port == 5000:
- print(
- "On macOS, try disabling the 'AirPlay Receiver' service"
- " from System Preferences -> General -> AirDrop & Handoff.",
- file=sys.stderr,
- )
- sys.exit(1)
- except BaseException:
- self.server_close()
- raise
- else:
- # TCPServer automatically opens a socket even if bind_and_activate is False.
- # Close it to silence a ResourceWarning.
- self.server_close()
- # Use the passed in socket directly.
- self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
- self.server_address = self.socket.getsockname()
- if address_family != af_unix:
- # If port was 0, this will record the bound port.
- self.port = self.server_address[1]
- if ssl_context is not None:
- if isinstance(ssl_context, tuple):
- ssl_context = load_ssl_context(*ssl_context)
- elif ssl_context == "adhoc":
- ssl_context = generate_adhoc_ssl_context()
- self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
- self.ssl_context: ssl.SSLContext | None = ssl_context
- else:
- self.ssl_context = None
- import importlib.metadata
- self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}"
- def log(self, type: str, message: str, *args: t.Any) -> None:
- _log(type, message, *args)
- def serve_forever(self, poll_interval: float = 0.5) -> None:
- try:
- super().serve_forever(poll_interval=poll_interval)
- except KeyboardInterrupt:
- pass
- finally:
- self.server_close()
- def handle_error(
- self, request: t.Any, client_address: tuple[str, int] | str
- ) -> None:
- if self.passthrough_errors:
- raise
- return super().handle_error(request, client_address)
- def log_startup(self) -> None:
- """Show information about the address when starting the server."""
- dev_warning = (
- "WARNING: This is a development server. Do not use it in a production"
- " deployment. Use a production WSGI server instead."
- )
- dev_warning = _ansi_style(dev_warning, "bold", "red")
- messages = [dev_warning]
- if self.address_family == af_unix:
- messages.append(f" * Running on {self.host}")
- else:
- scheme = "http" if self.ssl_context is None else "https"
- display_hostname = self.host
- if self.host in {"0.0.0.0", "::"}:
- messages.append(f" * Running on all addresses ({self.host})")
- if self.host == "0.0.0.0":
- localhost = "127.0.0.1"
- display_hostname = get_interface_ip(socket.AF_INET)
- else:
- localhost = "[::1]"
- display_hostname = get_interface_ip(socket.AF_INET6)
- messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
- if ":" in display_hostname:
- display_hostname = f"[{display_hostname}]"
- messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
- _log("info", "\n".join(messages))
- class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
- """A WSGI server that handles concurrent requests in separate
- threads.
- Use :func:`make_server` to create a server instance.
- """
- multithread = True
- daemon_threads = True
- class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
- """A WSGI server that handles concurrent requests in separate forked
- processes.
- Use :func:`make_server` to create a server instance.
- """
- multiprocess = True
- def __init__(
- self,
- host: str,
- port: int,
- app: WSGIApplication,
- processes: int = 40,
- handler: type[WSGIRequestHandler] | None = None,
- passthrough_errors: bool = False,
- ssl_context: _TSSLContextArg | None = None,
- fd: int | None = None,
- ) -> None:
- if not can_fork:
- raise ValueError("Your platform does not support forking.")
- super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
- self.max_children = processes
- def make_server(
- host: str,
- port: int,
- app: WSGIApplication,
- threaded: bool = False,
- processes: int = 1,
- request_handler: type[WSGIRequestHandler] | None = None,
- passthrough_errors: bool = False,
- ssl_context: _TSSLContextArg | None = None,
- fd: int | None = None,
- ) -> BaseWSGIServer:
- """Create an appropriate WSGI server instance based on the value of
- ``threaded`` and ``processes``.
- This is called from :func:`run_simple`, but can be used separately
- to have access to the server object, such as to run it in a separate
- thread.
- See :func:`run_simple` for parameter docs.
- """
- if threaded and processes > 1:
- raise ValueError("Cannot have a multi-thread and multi-process server.")
- if threaded:
- return ThreadedWSGIServer(
- host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
- )
- if processes > 1:
- return ForkingWSGIServer(
- host,
- port,
- app,
- processes,
- request_handler,
- passthrough_errors,
- ssl_context,
- fd=fd,
- )
- return BaseWSGIServer(
- host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
- )
- def is_running_from_reloader() -> bool:
- """Check if the server is running as a subprocess within the
- Werkzeug reloader.
- .. versionadded:: 0.10
- """
- return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
- def run_simple(
- hostname: str,
- port: int,
- application: WSGIApplication,
- use_reloader: bool = False,
- use_debugger: bool = False,
- use_evalex: bool = True,
- extra_files: t.Iterable[str] | None = None,
- exclude_patterns: t.Iterable[str] | None = None,
- reloader_interval: int = 1,
- reloader_type: str = "auto",
- threaded: bool = False,
- processes: int = 1,
- request_handler: type[WSGIRequestHandler] | None = None,
- static_files: dict[str, str | tuple[str, str]] | None = None,
- passthrough_errors: bool = False,
- ssl_context: _TSSLContextArg | None = None,
- ) -> None:
- """Start a development server for a WSGI application. Various
- optional features can be enabled.
- .. warning::
- Do not use the development server when deploying to production.
- It is intended for use only during local development. It is not
- designed to be particularly efficient, stable, or secure.
- :param hostname: The host to bind to, for example ``'localhost'``.
- Can be a domain, IPv4 or IPv6 address, or file path starting
- with ``unix://`` for a Unix socket.
- :param port: The port to bind to, for example ``8080``. Using ``0``
- tells the OS to pick a random free port.
- :param application: The WSGI application to run.
- :param use_reloader: Use a reloader process to restart the server
- process when files are changed.
- :param use_debugger: Use Werkzeug's debugger, which will show
- formatted tracebacks on unhandled exceptions.
- :param use_evalex: Make the debugger interactive. A Python terminal
- can be opened for any frame in the traceback. Some protection is
- provided by requiring a PIN, but this should never be enabled
- on a publicly visible server.
- :param extra_files: The reloader will watch these files for changes
- in addition to Python modules. For example, watch a
- configuration file.
- :param exclude_patterns: The reloader will ignore changes to any
- files matching these :mod:`fnmatch` patterns. For example,
- ignore cache files.
- :param reloader_interval: How often the reloader tries to check for
- changes.
- :param reloader_type: The reloader to use. The ``'stat'`` reloader
- is built in, but may require significant CPU to watch files. The
- ``'watchdog'`` reloader is much more efficient but requires
- installing the ``watchdog`` package first.
- :param threaded: Handle concurrent requests using threads. Cannot be
- used with ``processes``.
- :param processes: Handle concurrent requests using up to this number
- of processes. Cannot be used with ``threaded``.
- :param request_handler: Use a different
- :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
- handle requests.
- :param static_files: A dict mapping URL prefixes to directories to
- serve static files from using
- :class:`~werkzeug.middleware.SharedDataMiddleware`.
- :param passthrough_errors: Don't catch unhandled exceptions at the
- server level, let the server crash instead. If ``use_debugger``
- is enabled, the debugger will still catch such errors.
- :param ssl_context: Configure TLS to serve over HTTPS. Can be an
- :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
- tuple to create a typical context, or the string ``'adhoc'`` to
- generate a temporary self-signed certificate.
- .. versionchanged:: 2.1
- Instructions are shown for dealing with an "address already in
- use" error.
- .. versionchanged:: 2.1
- Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
- addition to a real IP.
- .. versionchanged:: 2.1
- The command-line interface was removed.
- .. versionchanged:: 2.0
- Running on ``0.0.0.0`` or ``::`` shows a real IP address that
- was bound as well as a warning not to run the development server
- in production.
- .. versionchanged:: 2.0
- The ``exclude_patterns`` parameter was added.
- .. versionchanged:: 0.15
- Bind to a Unix socket by passing a ``hostname`` that starts with
- ``unix://``.
- .. versionchanged:: 0.10
- Improved the reloader and added support for changing the backend
- through the ``reloader_type`` parameter.
- .. versionchanged:: 0.9
- A command-line interface was added.
- .. versionchanged:: 0.8
- ``ssl_context`` can be a tuple of paths to the certificate and
- private key files.
- .. versionchanged:: 0.6
- The ``ssl_context`` parameter was added.
- .. versionchanged:: 0.5
- The ``static_files`` and ``passthrough_errors`` parameters were
- added.
- """
- if not isinstance(port, int):
- raise TypeError("port must be an integer")
- if static_files:
- from .middleware.shared_data import SharedDataMiddleware
- application = SharedDataMiddleware(application, static_files)
- if use_debugger:
- from .debug import DebuggedApplication
- application = DebuggedApplication(application, evalex=use_evalex)
- if not is_running_from_reloader():
- fd = None
- else:
- fd = int(os.environ["WERKZEUG_SERVER_FD"])
- srv = make_server(
- hostname,
- port,
- application,
- threaded,
- processes,
- request_handler,
- passthrough_errors,
- ssl_context,
- fd=fd,
- )
- srv.socket.set_inheritable(True)
- os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
- if not is_running_from_reloader():
- srv.log_startup()
- _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
- if use_reloader:
- from ._reloader import run_with_reloader
- try:
- run_with_reloader(
- srv.serve_forever,
- extra_files=extra_files,
- exclude_patterns=exclude_patterns,
- interval=reloader_interval,
- reloader_type=reloader_type,
- )
- finally:
- srv.server_close()
- else:
- srv.serve_forever()
|