123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641 |
- from __future__ import annotations
- import copy
- import math
- import operator
- import typing as t
- from contextvars import ContextVar
- from functools import partial
- from functools import update_wrapper
- from operator import attrgetter
- from .wsgi import ClosingIterator
- if t.TYPE_CHECKING:
- from _typeshed.wsgi import StartResponse
- from _typeshed.wsgi import WSGIApplication
- from _typeshed.wsgi import WSGIEnvironment
- T = t.TypeVar("T")
- F = t.TypeVar("F", bound=t.Callable[..., t.Any])
- def release_local(local: Local | LocalStack) -> None:
- """Release the data for the current context in a :class:`Local` or
- :class:`LocalStack` without using a :class:`LocalManager`.
- This should not be needed for modern use cases, and may be removed
- in the future.
- .. versionadded:: 0.6.1
- """
- local.__release_local__()
- class Local:
- """Create a namespace of context-local data. This wraps a
- :class:`ContextVar` containing a :class:`dict` value.
- This may incur a performance penalty compared to using individual
- context vars, as it has to copy data to avoid mutating the dict
- between nested contexts.
- :param context_var: The :class:`~contextvars.ContextVar` to use as
- storage for this local. If not given, one will be created.
- Context vars not created at the global scope may interfere with
- garbage collection.
- .. versionchanged:: 2.0
- Uses ``ContextVar`` instead of a custom storage implementation.
- """
- __slots__ = ("__storage",)
- def __init__(self, context_var: ContextVar[dict[str, t.Any]] | None = None) -> None:
- if context_var is None:
- # A ContextVar not created at global scope interferes with
- # Python's garbage collection. However, a local only makes
- # sense defined at the global scope as well, in which case
- # the GC issue doesn't seem relevant.
- context_var = ContextVar(f"werkzeug.Local<{id(self)}>.storage")
- object.__setattr__(self, "_Local__storage", context_var)
- def __iter__(self) -> t.Iterator[tuple[str, t.Any]]:
- return iter(self.__storage.get({}).items())
- def __call__(self, name: str, *, unbound_message: str | None = None) -> LocalProxy:
- """Create a :class:`LocalProxy` that access an attribute on this
- local namespace.
- :param name: Proxy this attribute.
- :param unbound_message: The error message that the proxy will
- show if the attribute isn't set.
- """
- return LocalProxy(self, name, unbound_message=unbound_message)
- def __release_local__(self) -> None:
- self.__storage.set({})
- def __getattr__(self, name: str) -> t.Any:
- values = self.__storage.get({})
- if name in values:
- return values[name]
- raise AttributeError(name)
- def __setattr__(self, name: str, value: t.Any) -> None:
- values = self.__storage.get({}).copy()
- values[name] = value
- self.__storage.set(values)
- def __delattr__(self, name: str) -> None:
- values = self.__storage.get({})
- if name in values:
- values = values.copy()
- del values[name]
- self.__storage.set(values)
- else:
- raise AttributeError(name)
- class LocalStack(t.Generic[T]):
- """Create a stack of context-local data. This wraps a
- :class:`ContextVar` containing a :class:`list` value.
- This may incur a performance penalty compared to using individual
- context vars, as it has to copy data to avoid mutating the list
- between nested contexts.
- :param context_var: The :class:`~contextvars.ContextVar` to use as
- storage for this local. If not given, one will be created.
- Context vars not created at the global scope may interfere with
- garbage collection.
- .. versionchanged:: 2.0
- Uses ``ContextVar`` instead of a custom storage implementation.
- .. versionadded:: 0.6.1
- """
- __slots__ = ("_storage",)
- def __init__(self, context_var: ContextVar[list[T]] | None = None) -> None:
- if context_var is None:
- # A ContextVar not created at global scope interferes with
- # Python's garbage collection. However, a local only makes
- # sense defined at the global scope as well, in which case
- # the GC issue doesn't seem relevant.
- context_var = ContextVar(f"werkzeug.LocalStack<{id(self)}>.storage")
- self._storage = context_var
- def __release_local__(self) -> None:
- self._storage.set([])
- def push(self, obj: T) -> list[T]:
- """Add a new item to the top of the stack."""
- stack = self._storage.get([]).copy()
- stack.append(obj)
- self._storage.set(stack)
- return stack
- def pop(self) -> T | None:
- """Remove the top item from the stack and return it. If the
- stack is empty, return ``None``.
- """
- stack = self._storage.get([])
- if len(stack) == 0:
- return None
- rv = stack[-1]
- self._storage.set(stack[:-1])
- return rv
- @property
- def top(self) -> T | None:
- """The topmost item on the stack. If the stack is empty,
- `None` is returned.
- """
- stack = self._storage.get([])
- if len(stack) == 0:
- return None
- return stack[-1]
- def __call__(
- self, name: str | None = None, *, unbound_message: str | None = None
- ) -> LocalProxy:
- """Create a :class:`LocalProxy` that accesses the top of this
- local stack.
- :param name: If given, the proxy access this attribute of the
- top item, rather than the item itself.
- :param unbound_message: The error message that the proxy will
- show if the stack is empty.
- """
- return LocalProxy(self, name, unbound_message=unbound_message)
- class LocalManager:
- """Manage releasing the data for the current context in one or more
- :class:`Local` and :class:`LocalStack` objects.
- This should not be needed for modern use cases, and may be removed
- in the future.
- :param locals: A local or list of locals to manage.
- .. versionchanged:: 2.1
- The ``ident_func`` was removed.
- .. versionchanged:: 0.7
- The ``ident_func`` parameter was added.
- .. versionchanged:: 0.6.1
- The :func:`release_local` function can be used instead of a
- manager.
- """
- __slots__ = ("locals",)
- def __init__(
- self,
- locals: None | (Local | LocalStack | t.Iterable[Local | LocalStack]) = None,
- ) -> None:
- if locals is None:
- self.locals = []
- elif isinstance(locals, Local):
- self.locals = [locals]
- else:
- self.locals = list(locals) # type: ignore[arg-type]
- def cleanup(self) -> None:
- """Release the data in the locals for this context. Call this at
- the end of each request or use :meth:`make_middleware`.
- """
- for local in self.locals:
- release_local(local)
- def make_middleware(self, app: WSGIApplication) -> WSGIApplication:
- """Wrap a WSGI application so that local data is released
- automatically after the response has been sent for a request.
- """
- def application(
- environ: WSGIEnvironment, start_response: StartResponse
- ) -> t.Iterable[bytes]:
- return ClosingIterator(app(environ, start_response), self.cleanup)
- return application
- def middleware(self, func: WSGIApplication) -> WSGIApplication:
- """Like :meth:`make_middleware` but used as a decorator on the
- WSGI application function.
- .. code-block:: python
- @manager.middleware
- def application(environ, start_response):
- ...
- """
- return update_wrapper(self.make_middleware(func), func)
- def __repr__(self) -> str:
- return f"<{type(self).__name__} storages: {len(self.locals)}>"
- class _ProxyLookup:
- """Descriptor that handles proxied attribute lookup for
- :class:`LocalProxy`.
- :param f: The built-in function this attribute is accessed through.
- Instead of looking up the special method, the function call
- is redone on the object.
- :param fallback: Return this function if the proxy is unbound
- instead of raising a :exc:`RuntimeError`.
- :param is_attr: This proxied name is an attribute, not a function.
- Call the fallback immediately to get the value.
- :param class_value: Value to return when accessed from the
- ``LocalProxy`` class directly. Used for ``__doc__`` so building
- docs still works.
- """
- __slots__ = ("bind_f", "fallback", "is_attr", "class_value", "name")
- def __init__(
- self,
- f: t.Callable | None = None,
- fallback: t.Callable | None = None,
- class_value: t.Any | None = None,
- is_attr: bool = False,
- ) -> None:
- bind_f: t.Callable[[LocalProxy, t.Any], t.Callable] | None
- if hasattr(f, "__get__"):
- # A Python function, can be turned into a bound method.
- def bind_f(instance: LocalProxy, obj: t.Any) -> t.Callable:
- return f.__get__(obj, type(obj)) # type: ignore
- elif f is not None:
- # A C function, use partial to bind the first argument.
- def bind_f(instance: LocalProxy, obj: t.Any) -> t.Callable:
- return partial(f, obj)
- else:
- # Use getattr, which will produce a bound method.
- bind_f = None
- self.bind_f = bind_f
- self.fallback = fallback
- self.class_value = class_value
- self.is_attr = is_attr
- def __set_name__(self, owner: LocalProxy, name: str) -> None:
- self.name = name
- def __get__(self, instance: LocalProxy, owner: type | None = None) -> t.Any:
- if instance is None:
- if self.class_value is not None:
- return self.class_value
- return self
- try:
- obj = instance._get_current_object()
- except RuntimeError:
- if self.fallback is None:
- raise
- fallback = self.fallback.__get__(instance, owner)
- if self.is_attr:
- # __class__ and __doc__ are attributes, not methods.
- # Call the fallback to get the value.
- return fallback()
- return fallback
- if self.bind_f is not None:
- return self.bind_f(instance, obj)
- return getattr(obj, self.name)
- def __repr__(self) -> str:
- return f"proxy {self.name}"
- def __call__(self, instance: LocalProxy, *args: t.Any, **kwargs: t.Any) -> t.Any:
- """Support calling unbound methods from the class. For example,
- this happens with ``copy.copy``, which does
- ``type(x).__copy__(x)``. ``type(x)`` can't be proxied, so it
- returns the proxy type and descriptor.
- """
- return self.__get__(instance, type(instance))(*args, **kwargs)
- class _ProxyIOp(_ProxyLookup):
- """Look up an augmented assignment method on a proxied object. The
- method is wrapped to return the proxy instead of the object.
- """
- __slots__ = ()
- def __init__(
- self, f: t.Callable | None = None, fallback: t.Callable | None = None
- ) -> None:
- super().__init__(f, fallback)
- def bind_f(instance: LocalProxy, obj: t.Any) -> t.Callable:
- def i_op(self: t.Any, other: t.Any) -> LocalProxy:
- f(self, other) # type: ignore
- return instance
- return i_op.__get__(obj, type(obj)) # type: ignore
- self.bind_f = bind_f
- def _l_to_r_op(op: F) -> F:
- """Swap the argument order to turn an l-op into an r-op."""
- def r_op(obj: t.Any, other: t.Any) -> t.Any:
- return op(other, obj)
- return t.cast(F, r_op)
- def _identity(o: T) -> T:
- return o
- class LocalProxy(t.Generic[T]):
- """A proxy to the object bound to a context-local object. All
- operations on the proxy are forwarded to the bound object. If no
- object is bound, a ``RuntimeError`` is raised.
- :param local: The context-local object that provides the proxied
- object.
- :param name: Proxy this attribute from the proxied object.
- :param unbound_message: The error message to show if the
- context-local object is unbound.
- Proxy a :class:`~contextvars.ContextVar` to make it easier to
- access. Pass a name to proxy that attribute.
- .. code-block:: python
- _request_var = ContextVar("request")
- request = LocalProxy(_request_var)
- session = LocalProxy(_request_var, "session")
- Proxy an attribute on a :class:`Local` namespace by calling the
- local with the attribute name:
- .. code-block:: python
- data = Local()
- user = data("user")
- Proxy the top item on a :class:`LocalStack` by calling the local.
- Pass a name to proxy that attribute.
- .. code-block::
- app_stack = LocalStack()
- current_app = app_stack()
- g = app_stack("g")
- Pass a function to proxy the return value from that function. This
- was previously used to access attributes of local objects before
- that was supported directly.
- .. code-block:: python
- session = LocalProxy(lambda: request.session)
- ``__repr__`` and ``__class__`` are proxied, so ``repr(x)`` and
- ``isinstance(x, cls)`` will look like the proxied object. Use
- ``issubclass(type(x), LocalProxy)`` to check if an object is a
- proxy.
- .. code-block:: python
- repr(user) # <User admin>
- isinstance(user, User) # True
- issubclass(type(user), LocalProxy) # True
- .. versionchanged:: 2.2.2
- ``__wrapped__`` is set when wrapping an object, not only when
- wrapping a function, to prevent doctest from failing.
- .. versionchanged:: 2.2
- Can proxy a ``ContextVar`` or ``LocalStack`` directly.
- .. versionchanged:: 2.2
- The ``name`` parameter can be used with any proxied object, not
- only ``Local``.
- .. versionchanged:: 2.2
- Added the ``unbound_message`` parameter.
- .. versionchanged:: 2.0
- Updated proxied attributes and methods to reflect the current
- data model.
- .. versionchanged:: 0.6.1
- The class can be instantiated with a callable.
- """
- __slots__ = ("__wrapped", "_get_current_object")
- _get_current_object: t.Callable[[], T]
- """Return the current object this proxy is bound to. If the proxy is
- unbound, this raises a ``RuntimeError``.
- This should be used if you need to pass the object to something that
- doesn't understand the proxy. It can also be useful for performance
- if you are accessing the object multiple times in a function, rather
- than going through the proxy multiple times.
- """
- def __init__(
- self,
- local: ContextVar[T] | Local | LocalStack[T] | t.Callable[[], T],
- name: str | None = None,
- *,
- unbound_message: str | None = None,
- ) -> None:
- if name is None:
- get_name = _identity
- else:
- get_name = attrgetter(name) # type: ignore[assignment]
- if unbound_message is None:
- unbound_message = "object is not bound"
- if isinstance(local, Local):
- if name is None:
- raise TypeError("'name' is required when proxying a 'Local' object.")
- def _get_current_object() -> T:
- try:
- return get_name(local) # type: ignore[return-value]
- except AttributeError:
- raise RuntimeError(unbound_message) from None
- elif isinstance(local, LocalStack):
- def _get_current_object() -> T:
- obj = local.top
- if obj is None:
- raise RuntimeError(unbound_message)
- return get_name(obj)
- elif isinstance(local, ContextVar):
- def _get_current_object() -> T:
- try:
- obj = local.get()
- except LookupError:
- raise RuntimeError(unbound_message) from None
- return get_name(obj)
- elif callable(local):
- def _get_current_object() -> T:
- return get_name(local())
- else:
- raise TypeError(f"Don't know how to proxy '{type(local)}'.")
- object.__setattr__(self, "_LocalProxy__wrapped", local)
- object.__setattr__(self, "_get_current_object", _get_current_object)
- __doc__ = _ProxyLookup( # type: ignore
- class_value=__doc__, fallback=lambda self: type(self).__doc__, is_attr=True
- )
- __wrapped__ = _ProxyLookup(
- fallback=lambda self: self._LocalProxy__wrapped, is_attr=True
- )
- # __del__ should only delete the proxy
- __repr__ = _ProxyLookup( # type: ignore
- repr, fallback=lambda self: f"<{type(self).__name__} unbound>"
- )
- __str__ = _ProxyLookup(str) # type: ignore
- __bytes__ = _ProxyLookup(bytes)
- __format__ = _ProxyLookup() # type: ignore
- __lt__ = _ProxyLookup(operator.lt)
- __le__ = _ProxyLookup(operator.le)
- __eq__ = _ProxyLookup(operator.eq) # type: ignore
- __ne__ = _ProxyLookup(operator.ne) # type: ignore
- __gt__ = _ProxyLookup(operator.gt)
- __ge__ = _ProxyLookup(operator.ge)
- __hash__ = _ProxyLookup(hash) # type: ignore
- __bool__ = _ProxyLookup(bool, fallback=lambda self: False)
- __getattr__ = _ProxyLookup(getattr)
- # __getattribute__ triggered through __getattr__
- __setattr__ = _ProxyLookup(setattr) # type: ignore
- __delattr__ = _ProxyLookup(delattr) # type: ignore
- __dir__ = _ProxyLookup(dir, fallback=lambda self: []) # type: ignore
- # __get__ (proxying descriptor not supported)
- # __set__ (descriptor)
- # __delete__ (descriptor)
- # __set_name__ (descriptor)
- # __objclass__ (descriptor)
- # __slots__ used by proxy itself
- # __dict__ (__getattr__)
- # __weakref__ (__getattr__)
- # __init_subclass__ (proxying metaclass not supported)
- # __prepare__ (metaclass)
- __class__ = _ProxyLookup(fallback=lambda self: type(self), is_attr=True) # type: ignore[assignment]
- __instancecheck__ = _ProxyLookup(lambda self, other: isinstance(other, self))
- __subclasscheck__ = _ProxyLookup(lambda self, other: issubclass(other, self))
- # __class_getitem__ triggered through __getitem__
- __call__ = _ProxyLookup(lambda self, *args, **kwargs: self(*args, **kwargs))
- __len__ = _ProxyLookup(len)
- __length_hint__ = _ProxyLookup(operator.length_hint)
- __getitem__ = _ProxyLookup(operator.getitem)
- __setitem__ = _ProxyLookup(operator.setitem)
- __delitem__ = _ProxyLookup(operator.delitem)
- # __missing__ triggered through __getitem__
- __iter__ = _ProxyLookup(iter)
- __next__ = _ProxyLookup(next)
- __reversed__ = _ProxyLookup(reversed)
- __contains__ = _ProxyLookup(operator.contains)
- __add__ = _ProxyLookup(operator.add)
- __sub__ = _ProxyLookup(operator.sub)
- __mul__ = _ProxyLookup(operator.mul)
- __matmul__ = _ProxyLookup(operator.matmul)
- __truediv__ = _ProxyLookup(operator.truediv)
- __floordiv__ = _ProxyLookup(operator.floordiv)
- __mod__ = _ProxyLookup(operator.mod)
- __divmod__ = _ProxyLookup(divmod)
- __pow__ = _ProxyLookup(pow)
- __lshift__ = _ProxyLookup(operator.lshift)
- __rshift__ = _ProxyLookup(operator.rshift)
- __and__ = _ProxyLookup(operator.and_)
- __xor__ = _ProxyLookup(operator.xor)
- __or__ = _ProxyLookup(operator.or_)
- __radd__ = _ProxyLookup(_l_to_r_op(operator.add))
- __rsub__ = _ProxyLookup(_l_to_r_op(operator.sub))
- __rmul__ = _ProxyLookup(_l_to_r_op(operator.mul))
- __rmatmul__ = _ProxyLookup(_l_to_r_op(operator.matmul))
- __rtruediv__ = _ProxyLookup(_l_to_r_op(operator.truediv))
- __rfloordiv__ = _ProxyLookup(_l_to_r_op(operator.floordiv))
- __rmod__ = _ProxyLookup(_l_to_r_op(operator.mod))
- __rdivmod__ = _ProxyLookup(_l_to_r_op(divmod))
- __rpow__ = _ProxyLookup(_l_to_r_op(pow))
- __rlshift__ = _ProxyLookup(_l_to_r_op(operator.lshift))
- __rrshift__ = _ProxyLookup(_l_to_r_op(operator.rshift))
- __rand__ = _ProxyLookup(_l_to_r_op(operator.and_))
- __rxor__ = _ProxyLookup(_l_to_r_op(operator.xor))
- __ror__ = _ProxyLookup(_l_to_r_op(operator.or_))
- __iadd__ = _ProxyIOp(operator.iadd)
- __isub__ = _ProxyIOp(operator.isub)
- __imul__ = _ProxyIOp(operator.imul)
- __imatmul__ = _ProxyIOp(operator.imatmul)
- __itruediv__ = _ProxyIOp(operator.itruediv)
- __ifloordiv__ = _ProxyIOp(operator.ifloordiv)
- __imod__ = _ProxyIOp(operator.imod)
- __ipow__ = _ProxyIOp(operator.ipow)
- __ilshift__ = _ProxyIOp(operator.ilshift)
- __irshift__ = _ProxyIOp(operator.irshift)
- __iand__ = _ProxyIOp(operator.iand)
- __ixor__ = _ProxyIOp(operator.ixor)
- __ior__ = _ProxyIOp(operator.ior)
- __neg__ = _ProxyLookup(operator.neg)
- __pos__ = _ProxyLookup(operator.pos)
- __abs__ = _ProxyLookup(abs)
- __invert__ = _ProxyLookup(operator.invert)
- __complex__ = _ProxyLookup(complex)
- __int__ = _ProxyLookup(int)
- __float__ = _ProxyLookup(float)
- __index__ = _ProxyLookup(operator.index)
- __round__ = _ProxyLookup(round)
- __trunc__ = _ProxyLookup(math.trunc)
- __floor__ = _ProxyLookup(math.floor)
- __ceil__ = _ProxyLookup(math.ceil)
- __enter__ = _ProxyLookup()
- __exit__ = _ProxyLookup()
- __await__ = _ProxyLookup()
- __aiter__ = _ProxyLookup()
- __anext__ = _ProxyLookup()
- __aenter__ = _ProxyLookup()
- __aexit__ = _ProxyLookup()
- __copy__ = _ProxyLookup(copy.copy)
- __deepcopy__ = _ProxyLookup(copy.deepcopy)
- # __getnewargs_ex__ (pickle through proxy not supported)
- # __getnewargs__ (pickle)
- # __getstate__ (pickle)
- # __setstate__ (pickle)
- # __reduce__ (pickle)
- # __reduce_ex__ (pickle)
|