fix troostwijk changes
This commit is contained in:
@@ -0,0 +1,34 @@
|
||||
from .accept import Accept as Accept
|
||||
from .accept import CharsetAccept as CharsetAccept
|
||||
from .accept import LanguageAccept as LanguageAccept
|
||||
from .accept import MIMEAccept as MIMEAccept
|
||||
from .auth import Authorization as Authorization
|
||||
from .auth import WWWAuthenticate as WWWAuthenticate
|
||||
from .cache_control import RequestCacheControl as RequestCacheControl
|
||||
from .cache_control import ResponseCacheControl as ResponseCacheControl
|
||||
from .csp import ContentSecurityPolicy as ContentSecurityPolicy
|
||||
from .etag import ETags as ETags
|
||||
from .file_storage import FileMultiDict as FileMultiDict
|
||||
from .file_storage import FileStorage as FileStorage
|
||||
from .headers import EnvironHeaders as EnvironHeaders
|
||||
from .headers import Headers as Headers
|
||||
from .mixins import ImmutableDictMixin as ImmutableDictMixin
|
||||
from .mixins import ImmutableHeadersMixin as ImmutableHeadersMixin
|
||||
from .mixins import ImmutableListMixin as ImmutableListMixin
|
||||
from .mixins import ImmutableMultiDictMixin as ImmutableMultiDictMixin
|
||||
from .mixins import UpdateDictMixin as UpdateDictMixin
|
||||
from .range import ContentRange as ContentRange
|
||||
from .range import IfRange as IfRange
|
||||
from .range import Range as Range
|
||||
from .structures import CallbackDict as CallbackDict
|
||||
from .structures import CombinedMultiDict as CombinedMultiDict
|
||||
from .structures import HeaderSet as HeaderSet
|
||||
from .structures import ImmutableDict as ImmutableDict
|
||||
from .structures import ImmutableList as ImmutableList
|
||||
from .structures import ImmutableMultiDict as ImmutableMultiDict
|
||||
from .structures import ImmutableOrderedMultiDict as ImmutableOrderedMultiDict
|
||||
from .structures import ImmutableTypeConversionDict as ImmutableTypeConversionDict
|
||||
from .structures import iter_multi_items as iter_multi_items
|
||||
from .structures import MultiDict as MultiDict
|
||||
from .structures import OrderedMultiDict as OrderedMultiDict
|
||||
from .structures import TypeConversionDict as TypeConversionDict
|
||||
@@ -0,0 +1,326 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import codecs
|
||||
import re
|
||||
|
||||
from .structures import ImmutableList
|
||||
|
||||
|
||||
class Accept(ImmutableList):
|
||||
"""An :class:`Accept` object is just a list subclass for lists of
|
||||
``(value, quality)`` tuples. It is automatically sorted by specificity
|
||||
and quality.
|
||||
|
||||
All :class:`Accept` objects work similar to a list but provide extra
|
||||
functionality for working with the data. Containment checks are
|
||||
normalized to the rules of that header:
|
||||
|
||||
>>> a = CharsetAccept([('ISO-8859-1', 1), ('utf-8', 0.7)])
|
||||
>>> a.best
|
||||
'ISO-8859-1'
|
||||
>>> 'iso-8859-1' in a
|
||||
True
|
||||
>>> 'UTF8' in a
|
||||
True
|
||||
>>> 'utf7' in a
|
||||
False
|
||||
|
||||
To get the quality for an item you can use normal item lookup:
|
||||
|
||||
>>> print a['utf-8']
|
||||
0.7
|
||||
>>> a['utf7']
|
||||
0
|
||||
|
||||
.. versionchanged:: 0.5
|
||||
:class:`Accept` objects are forced immutable now.
|
||||
|
||||
.. versionchanged:: 1.0.0
|
||||
:class:`Accept` internal values are no longer ordered
|
||||
alphabetically for equal quality tags. Instead the initial
|
||||
order is preserved.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, values=()):
|
||||
if values is None:
|
||||
list.__init__(self)
|
||||
self.provided = False
|
||||
elif isinstance(values, Accept):
|
||||
self.provided = values.provided
|
||||
list.__init__(self, values)
|
||||
else:
|
||||
self.provided = True
|
||||
values = sorted(
|
||||
values, key=lambda x: (self._specificity(x[0]), x[1]), reverse=True
|
||||
)
|
||||
list.__init__(self, values)
|
||||
|
||||
def _specificity(self, value):
|
||||
"""Returns a tuple describing the value's specificity."""
|
||||
return (value != "*",)
|
||||
|
||||
def _value_matches(self, value, item):
|
||||
"""Check if a value matches a given accept item."""
|
||||
return item == "*" or item.lower() == value.lower()
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""Besides index lookup (getting item n) you can also pass it a string
|
||||
to get the quality for the item. If the item is not in the list, the
|
||||
returned quality is ``0``.
|
||||
"""
|
||||
if isinstance(key, str):
|
||||
return self.quality(key)
|
||||
return list.__getitem__(self, key)
|
||||
|
||||
def quality(self, key):
|
||||
"""Returns the quality of the key.
|
||||
|
||||
.. versionadded:: 0.6
|
||||
In previous versions you had to use the item-lookup syntax
|
||||
(eg: ``obj[key]`` instead of ``obj.quality(key)``)
|
||||
"""
|
||||
for item, quality in self:
|
||||
if self._value_matches(key, item):
|
||||
return quality
|
||||
return 0
|
||||
|
||||
def __contains__(self, value):
|
||||
for item, _quality in self:
|
||||
if self._value_matches(value, item):
|
||||
return True
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
pairs_str = ", ".join(f"({x!r}, {y})" for x, y in self)
|
||||
return f"{type(self).__name__}([{pairs_str}])"
|
||||
|
||||
def index(self, key):
|
||||
"""Get the position of an entry or raise :exc:`ValueError`.
|
||||
|
||||
:param key: The key to be looked up.
|
||||
|
||||
.. versionchanged:: 0.5
|
||||
This used to raise :exc:`IndexError`, which was inconsistent
|
||||
with the list API.
|
||||
"""
|
||||
if isinstance(key, str):
|
||||
for idx, (item, _quality) in enumerate(self):
|
||||
if self._value_matches(key, item):
|
||||
return idx
|
||||
raise ValueError(key)
|
||||
return list.index(self, key)
|
||||
|
||||
def find(self, key):
|
||||
"""Get the position of an entry or return -1.
|
||||
|
||||
:param key: The key to be looked up.
|
||||
"""
|
||||
try:
|
||||
return self.index(key)
|
||||
except ValueError:
|
||||
return -1
|
||||
|
||||
def values(self):
|
||||
"""Iterate over all values."""
|
||||
for item in self:
|
||||
yield item[0]
|
||||
|
||||
def to_header(self):
|
||||
"""Convert the header set into an HTTP header string."""
|
||||
result = []
|
||||
for value, quality in self:
|
||||
if quality != 1:
|
||||
value = f"{value};q={quality}"
|
||||
result.append(value)
|
||||
return ",".join(result)
|
||||
|
||||
def __str__(self):
|
||||
return self.to_header()
|
||||
|
||||
def _best_single_match(self, match):
|
||||
for client_item, quality in self:
|
||||
if self._value_matches(match, client_item):
|
||||
# self is sorted by specificity descending, we can exit
|
||||
return client_item, quality
|
||||
return None
|
||||
|
||||
def best_match(self, matches, default=None):
|
||||
"""Returns the best match from a list of possible matches based
|
||||
on the specificity and quality of the client. If two items have the
|
||||
same quality and specificity, the one is returned that comes first.
|
||||
|
||||
:param matches: a list of matches to check for
|
||||
:param default: the value that is returned if none match
|
||||
"""
|
||||
result = default
|
||||
best_quality = -1
|
||||
best_specificity = (-1,)
|
||||
for server_item in matches:
|
||||
match = self._best_single_match(server_item)
|
||||
if not match:
|
||||
continue
|
||||
client_item, quality = match
|
||||
specificity = self._specificity(client_item)
|
||||
if quality <= 0 or quality < best_quality:
|
||||
continue
|
||||
# better quality or same quality but more specific => better match
|
||||
if quality > best_quality or specificity > best_specificity:
|
||||
result = server_item
|
||||
best_quality = quality
|
||||
best_specificity = specificity
|
||||
return result
|
||||
|
||||
@property
|
||||
def best(self):
|
||||
"""The best match as value."""
|
||||
if self:
|
||||
return self[0][0]
|
||||
|
||||
|
||||
_mime_split_re = re.compile(r"/|(?:\s*;\s*)")
|
||||
|
||||
|
||||
def _normalize_mime(value):
|
||||
return _mime_split_re.split(value.lower())
|
||||
|
||||
|
||||
class MIMEAccept(Accept):
|
||||
"""Like :class:`Accept` but with special methods and behavior for
|
||||
mimetypes.
|
||||
"""
|
||||
|
||||
def _specificity(self, value):
|
||||
return tuple(x != "*" for x in _mime_split_re.split(value))
|
||||
|
||||
def _value_matches(self, value, item):
|
||||
# item comes from the client, can't match if it's invalid.
|
||||
if "/" not in item:
|
||||
return False
|
||||
|
||||
# value comes from the application, tell the developer when it
|
||||
# doesn't look valid.
|
||||
if "/" not in value:
|
||||
raise ValueError(f"invalid mimetype {value!r}")
|
||||
|
||||
# Split the match value into type, subtype, and a sorted list of parameters.
|
||||
normalized_value = _normalize_mime(value)
|
||||
value_type, value_subtype = normalized_value[:2]
|
||||
value_params = sorted(normalized_value[2:])
|
||||
|
||||
# "*/*" is the only valid value that can start with "*".
|
||||
if value_type == "*" and value_subtype != "*":
|
||||
raise ValueError(f"invalid mimetype {value!r}")
|
||||
|
||||
# Split the accept item into type, subtype, and parameters.
|
||||
normalized_item = _normalize_mime(item)
|
||||
item_type, item_subtype = normalized_item[:2]
|
||||
item_params = sorted(normalized_item[2:])
|
||||
|
||||
# "*/not-*" from the client is invalid, can't match.
|
||||
if item_type == "*" and item_subtype != "*":
|
||||
return False
|
||||
|
||||
return (
|
||||
(item_type == "*" and item_subtype == "*")
|
||||
or (value_type == "*" and value_subtype == "*")
|
||||
) or (
|
||||
item_type == value_type
|
||||
and (
|
||||
item_subtype == "*"
|
||||
or value_subtype == "*"
|
||||
or (item_subtype == value_subtype and item_params == value_params)
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def accept_html(self):
|
||||
"""True if this object accepts HTML."""
|
||||
return (
|
||||
"text/html" in self or "application/xhtml+xml" in self or self.accept_xhtml
|
||||
)
|
||||
|
||||
@property
|
||||
def accept_xhtml(self):
|
||||
"""True if this object accepts XHTML."""
|
||||
return "application/xhtml+xml" in self or "application/xml" in self
|
||||
|
||||
@property
|
||||
def accept_json(self):
|
||||
"""True if this object accepts JSON."""
|
||||
return "application/json" in self
|
||||
|
||||
|
||||
_locale_delim_re = re.compile(r"[_-]")
|
||||
|
||||
|
||||
def _normalize_lang(value):
|
||||
"""Process a language tag for matching."""
|
||||
return _locale_delim_re.split(value.lower())
|
||||
|
||||
|
||||
class LanguageAccept(Accept):
|
||||
"""Like :class:`Accept` but with normalization for language tags."""
|
||||
|
||||
def _value_matches(self, value, item):
|
||||
return item == "*" or _normalize_lang(value) == _normalize_lang(item)
|
||||
|
||||
def best_match(self, matches, default=None):
|
||||
"""Given a list of supported values, finds the best match from
|
||||
the list of accepted values.
|
||||
|
||||
Language tags are normalized for the purpose of matching, but
|
||||
are returned unchanged.
|
||||
|
||||
If no exact match is found, this will fall back to matching
|
||||
the first subtag (primary language only), first with the
|
||||
accepted values then with the match values. This partial is not
|
||||
applied to any other language subtags.
|
||||
|
||||
The default is returned if no exact or fallback match is found.
|
||||
|
||||
:param matches: A list of supported languages to find a match.
|
||||
:param default: The value that is returned if none match.
|
||||
"""
|
||||
# Look for an exact match first. If a client accepts "en-US",
|
||||
# "en-US" is a valid match at this point.
|
||||
result = super().best_match(matches)
|
||||
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# Fall back to accepting primary tags. If a client accepts
|
||||
# "en-US", "en" is a valid match at this point. Need to use
|
||||
# re.split to account for 2 or 3 letter codes.
|
||||
fallback = Accept(
|
||||
[(_locale_delim_re.split(item[0], 1)[0], item[1]) for item in self]
|
||||
)
|
||||
result = fallback.best_match(matches)
|
||||
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# Fall back to matching primary tags. If the client accepts
|
||||
# "en", "en-US" is a valid match at this point.
|
||||
fallback_matches = [_locale_delim_re.split(item, 1)[0] for item in matches]
|
||||
result = super().best_match(fallback_matches)
|
||||
|
||||
# Return a value from the original match list. Find the first
|
||||
# original value that starts with the matched primary tag.
|
||||
if result is not None:
|
||||
return next(item for item in matches if item.startswith(result))
|
||||
|
||||
return default
|
||||
|
||||
|
||||
class CharsetAccept(Accept):
|
||||
"""Like :class:`Accept` but with normalization for charsets."""
|
||||
|
||||
def _value_matches(self, value, item):
|
||||
def _normalize(name):
|
||||
try:
|
||||
return codecs.lookup(name).name
|
||||
except LookupError:
|
||||
return name.lower()
|
||||
|
||||
return item == "*" or _normalize(value) == _normalize(item)
|
||||
@@ -0,0 +1,54 @@
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Iterator
|
||||
from typing import overload
|
||||
|
||||
from .structures import ImmutableList
|
||||
|
||||
class Accept(ImmutableList[tuple[str, int]]):
|
||||
provided: bool
|
||||
def __init__(
|
||||
self, values: Accept | Iterable[tuple[str, float]] | None = None
|
||||
) -> None: ...
|
||||
def _specificity(self, value: str) -> tuple[bool, ...]: ...
|
||||
def _value_matches(self, value: str, item: str) -> bool: ...
|
||||
@overload # type: ignore
|
||||
def __getitem__(self, key: str) -> int: ...
|
||||
@overload
|
||||
def __getitem__(self, key: int) -> tuple[str, int]: ...
|
||||
@overload
|
||||
def __getitem__(self, key: slice) -> Iterable[tuple[str, int]]: ...
|
||||
def quality(self, key: str) -> int: ...
|
||||
def __contains__(self, value: str) -> bool: ... # type: ignore
|
||||
def index(self, key: str) -> int: ... # type: ignore
|
||||
def find(self, key: str) -> int: ...
|
||||
def values(self) -> Iterator[str]: ...
|
||||
def to_header(self) -> str: ...
|
||||
def _best_single_match(self, match: str) -> tuple[str, int] | None: ...
|
||||
@overload
|
||||
def best_match(self, matches: Iterable[str], default: str) -> str: ...
|
||||
@overload
|
||||
def best_match(
|
||||
self, matches: Iterable[str], default: str | None = None
|
||||
) -> str | None: ...
|
||||
@property
|
||||
def best(self) -> str: ...
|
||||
|
||||
def _normalize_mime(value: str) -> list[str]: ...
|
||||
|
||||
class MIMEAccept(Accept):
|
||||
def _specificity(self, value: str) -> tuple[bool, ...]: ...
|
||||
def _value_matches(self, value: str, item: str) -> bool: ...
|
||||
@property
|
||||
def accept_html(self) -> bool: ...
|
||||
@property
|
||||
def accept_xhtml(self) -> bool: ...
|
||||
@property
|
||||
def accept_json(self) -> bool: ...
|
||||
|
||||
def _normalize_lang(value: str) -> list[str]: ...
|
||||
|
||||
class LanguageAccept(Accept):
|
||||
def _value_matches(self, value: str, item: str) -> bool: ...
|
||||
|
||||
class CharsetAccept(Accept):
|
||||
def _value_matches(self, value: str, item: str) -> bool: ...
|
||||
@@ -0,0 +1,318 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import typing as t
|
||||
|
||||
from ..http import dump_header
|
||||
from ..http import parse_dict_header
|
||||
from ..http import quote_header_value
|
||||
from .structures import CallbackDict
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
|
||||
class Authorization:
|
||||
"""Represents the parts of an ``Authorization`` request header.
|
||||
|
||||
:attr:`.Request.authorization` returns an instance if the header is set.
|
||||
|
||||
An instance can be used with the test :class:`.Client` request methods' ``auth``
|
||||
parameter to send the header in test requests.
|
||||
|
||||
Depending on the auth scheme, either :attr:`parameters` or :attr:`token` will be
|
||||
set. The ``Basic`` scheme's token is decoded into the ``username`` and ``password``
|
||||
parameters.
|
||||
|
||||
For convenience, ``auth["key"]`` and ``auth.key`` both access the key in the
|
||||
:attr:`parameters` dict, along with ``auth.get("key")`` and ``"key" in auth``.
|
||||
|
||||
.. versionchanged:: 2.3
|
||||
The ``token`` parameter and attribute was added to support auth schemes that use
|
||||
a token instead of parameters, such as ``Bearer``.
|
||||
|
||||
.. versionchanged:: 2.3
|
||||
The object is no longer a ``dict``.
|
||||
|
||||
.. versionchanged:: 0.5
|
||||
The object is an immutable dict.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
auth_type: str,
|
||||
data: dict[str, str | None] | None = None,
|
||||
token: str | None = None,
|
||||
) -> None:
|
||||
self.type = auth_type
|
||||
"""The authorization scheme, like ``basic``, ``digest``, or ``bearer``."""
|
||||
|
||||
if data is None:
|
||||
data = {}
|
||||
|
||||
self.parameters = data
|
||||
"""A dict of parameters parsed from the header. Either this or :attr:`token`
|
||||
will have a value for a given scheme.
|
||||
"""
|
||||
|
||||
self.token = token
|
||||
"""A token parsed from the header. Either this or :attr:`parameters` will have a
|
||||
value for a given scheme.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
"""
|
||||
|
||||
def __getattr__(self, name: str) -> str | None:
|
||||
return self.parameters.get(name)
|
||||
|
||||
def __getitem__(self, name: str) -> str | None:
|
||||
return self.parameters.get(name)
|
||||
|
||||
def get(self, key: str, default: str | None = None) -> str | None:
|
||||
return self.parameters.get(key, default)
|
||||
|
||||
def __contains__(self, key: str) -> bool:
|
||||
return key in self.parameters
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, Authorization):
|
||||
return NotImplemented
|
||||
|
||||
return (
|
||||
other.type == self.type
|
||||
and other.token == self.token
|
||||
and other.parameters == self.parameters
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_header(cls, value: str | None) -> te.Self | None:
|
||||
"""Parse an ``Authorization`` header value and return an instance, or ``None``
|
||||
if the value is empty.
|
||||
|
||||
:param value: The header value to parse.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
"""
|
||||
if not value:
|
||||
return None
|
||||
|
||||
scheme, _, rest = value.partition(" ")
|
||||
scheme = scheme.lower()
|
||||
rest = rest.strip()
|
||||
|
||||
if scheme == "basic":
|
||||
try:
|
||||
username, _, password = base64.b64decode(rest).decode().partition(":")
|
||||
except (binascii.Error, UnicodeError):
|
||||
return None
|
||||
|
||||
return cls(scheme, {"username": username, "password": password})
|
||||
|
||||
if "=" in rest.rstrip("="):
|
||||
# = that is not trailing, this is parameters.
|
||||
return cls(scheme, parse_dict_header(rest), None)
|
||||
|
||||
# No = or only trailing =, this is a token.
|
||||
return cls(scheme, None, rest)
|
||||
|
||||
def to_header(self) -> str:
|
||||
"""Produce an ``Authorization`` header value representing this data.
|
||||
|
||||
.. versionadded:: 2.0
|
||||
"""
|
||||
if self.type == "basic":
|
||||
value = base64.b64encode(
|
||||
f"{self.username}:{self.password}".encode()
|
||||
).decode("utf8")
|
||||
return f"Basic {value}"
|
||||
|
||||
if self.token is not None:
|
||||
return f"{self.type.title()} {self.token}"
|
||||
|
||||
return f"{self.type.title()} {dump_header(self.parameters)}"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.to_header()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{type(self).__name__} {self.to_header()}>"
|
||||
|
||||
|
||||
class WWWAuthenticate:
|
||||
"""Represents the parts of a ``WWW-Authenticate`` response header.
|
||||
|
||||
Set :attr:`.Response.www_authenticate` to an instance of list of instances to set
|
||||
values for this header in the response. Modifying this instance will modify the
|
||||
header value.
|
||||
|
||||
Depending on the auth scheme, either :attr:`parameters` or :attr:`token` should be
|
||||
set. The ``Basic`` scheme will encode ``username`` and ``password`` parameters to a
|
||||
token.
|
||||
|
||||
For convenience, ``auth["key"]`` and ``auth.key`` both act on the :attr:`parameters`
|
||||
dict, and can be used to get, set, or delete parameters. ``auth.get("key")`` and
|
||||
``"key" in auth`` are also provided.
|
||||
|
||||
.. versionchanged:: 2.3
|
||||
The ``token`` parameter and attribute was added to support auth schemes that use
|
||||
a token instead of parameters, such as ``Bearer``.
|
||||
|
||||
.. versionchanged:: 2.3
|
||||
The object is no longer a ``dict``.
|
||||
|
||||
.. versionchanged:: 2.3
|
||||
The ``on_update`` parameter was removed.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
auth_type: str,
|
||||
values: dict[str, str | None] | None = None,
|
||||
token: str | None = None,
|
||||
):
|
||||
self._type = auth_type.lower()
|
||||
self._parameters: dict[str, str | None] = CallbackDict( # type: ignore[misc]
|
||||
values, lambda _: self._trigger_on_update()
|
||||
)
|
||||
self._token = token
|
||||
self._on_update: t.Callable[[WWWAuthenticate], None] | None = None
|
||||
|
||||
def _trigger_on_update(self) -> None:
|
||||
if self._on_update is not None:
|
||||
self._on_update(self)
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
"""The authorization scheme, like ``basic``, ``digest``, or ``bearer``."""
|
||||
return self._type
|
||||
|
||||
@type.setter
|
||||
def type(self, value: str) -> None:
|
||||
self._type = value
|
||||
self._trigger_on_update()
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, str | None]:
|
||||
"""A dict of parameters for the header. Only one of this or :attr:`token` should
|
||||
have a value for a given scheme.
|
||||
"""
|
||||
return self._parameters
|
||||
|
||||
@parameters.setter
|
||||
def parameters(self, value: dict[str, str]) -> None:
|
||||
self._parameters = CallbackDict( # type: ignore[misc]
|
||||
value, lambda _: self._trigger_on_update()
|
||||
)
|
||||
self._trigger_on_update()
|
||||
|
||||
@property
|
||||
def token(self) -> str | None:
|
||||
"""A dict of parameters for the header. Only one of this or :attr:`token` should
|
||||
have a value for a given scheme.
|
||||
"""
|
||||
return self._token
|
||||
|
||||
@token.setter
|
||||
def token(self, value: str | None) -> None:
|
||||
"""A token for the header. Only one of this or :attr:`parameters` should have a
|
||||
value for a given scheme.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
"""
|
||||
self._token = value
|
||||
self._trigger_on_update()
|
||||
|
||||
def __getitem__(self, key: str) -> str | None:
|
||||
return self.parameters.get(key)
|
||||
|
||||
def __setitem__(self, key: str, value: str | None) -> None:
|
||||
if value is None:
|
||||
if key in self.parameters:
|
||||
del self.parameters[key]
|
||||
else:
|
||||
self.parameters[key] = value
|
||||
|
||||
self._trigger_on_update()
|
||||
|
||||
def __delitem__(self, key: str) -> None:
|
||||
if key in self.parameters:
|
||||
del self.parameters[key]
|
||||
self._trigger_on_update()
|
||||
|
||||
def __getattr__(self, name: str) -> str | None:
|
||||
return self[name]
|
||||
|
||||
def __setattr__(self, name: str, value: str | None) -> None:
|
||||
if name in {"_type", "_parameters", "_token", "_on_update"}:
|
||||
super().__setattr__(name, value)
|
||||
else:
|
||||
self[name] = value
|
||||
|
||||
def __delattr__(self, name: str) -> None:
|
||||
del self[name]
|
||||
|
||||
def __contains__(self, key: str) -> bool:
|
||||
return key in self.parameters
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, WWWAuthenticate):
|
||||
return NotImplemented
|
||||
|
||||
return (
|
||||
other.type == self.type
|
||||
and other.token == self.token
|
||||
and other.parameters == self.parameters
|
||||
)
|
||||
|
||||
def get(self, key: str, default: str | None = None) -> str | None:
|
||||
return self.parameters.get(key, default)
|
||||
|
||||
@classmethod
|
||||
def from_header(cls, value: str | None) -> te.Self | None:
|
||||
"""Parse a ``WWW-Authenticate`` header value and return an instance, or ``None``
|
||||
if the value is empty.
|
||||
|
||||
:param value: The header value to parse.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
"""
|
||||
if not value:
|
||||
return None
|
||||
|
||||
scheme, _, rest = value.partition(" ")
|
||||
scheme = scheme.lower()
|
||||
rest = rest.strip()
|
||||
|
||||
if "=" in rest.rstrip("="):
|
||||
# = that is not trailing, this is parameters.
|
||||
return cls(scheme, parse_dict_header(rest), None)
|
||||
|
||||
# No = or only trailing =, this is a token.
|
||||
return cls(scheme, None, rest)
|
||||
|
||||
def to_header(self) -> str:
|
||||
"""Produce a ``WWW-Authenticate`` header value representing this data."""
|
||||
if self.token is not None:
|
||||
return f"{self.type.title()} {self.token}"
|
||||
|
||||
if self.type == "digest":
|
||||
items = []
|
||||
|
||||
for key, value in self.parameters.items():
|
||||
if key in {"realm", "domain", "nonce", "opaque", "qop"}:
|
||||
value = quote_header_value(value, allow_token=False)
|
||||
else:
|
||||
value = quote_header_value(value)
|
||||
|
||||
items.append(f"{key}={value}")
|
||||
|
||||
return f"Digest {', '.join(items)}"
|
||||
|
||||
return f"{self.type.title()} {dump_header(self.parameters)}"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.to_header()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{type(self).__name__} {self.to_header()}>"
|
||||
@@ -0,0 +1,175 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from .mixins import ImmutableDictMixin
|
||||
from .mixins import UpdateDictMixin
|
||||
|
||||
|
||||
def cache_control_property(key, empty, type):
|
||||
"""Return a new property object for a cache header. Useful if you
|
||||
want to add support for a cache extension in a subclass.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
Renamed from ``cache_property``.
|
||||
"""
|
||||
return property(
|
||||
lambda x: x._get_cache_value(key, empty, type),
|
||||
lambda x, v: x._set_cache_value(key, v, type),
|
||||
lambda x: x._del_cache_value(key),
|
||||
f"accessor for {key!r}",
|
||||
)
|
||||
|
||||
|
||||
class _CacheControl(UpdateDictMixin, dict):
|
||||
"""Subclass of a dict that stores values for a Cache-Control header. It
|
||||
has accessors for all the cache-control directives specified in RFC 2616.
|
||||
The class does not differentiate between request and response directives.
|
||||
|
||||
Because the cache-control directives in the HTTP header use dashes the
|
||||
python descriptors use underscores for that.
|
||||
|
||||
To get a header of the :class:`CacheControl` object again you can convert
|
||||
the object into a string or call the :meth:`to_header` method. If you plan
|
||||
to subclass it and add your own items have a look at the sourcecode for
|
||||
that class.
|
||||
|
||||
.. versionchanged:: 2.1.0
|
||||
Setting int properties such as ``max_age`` will convert the
|
||||
value to an int.
|
||||
|
||||
.. versionchanged:: 0.4
|
||||
|
||||
Setting `no_cache` or `private` to boolean `True` will set the implicit
|
||||
none-value which is ``*``:
|
||||
|
||||
>>> cc = ResponseCacheControl()
|
||||
>>> cc.no_cache = True
|
||||
>>> cc
|
||||
<ResponseCacheControl 'no-cache'>
|
||||
>>> cc.no_cache
|
||||
'*'
|
||||
>>> cc.no_cache = None
|
||||
>>> cc
|
||||
<ResponseCacheControl ''>
|
||||
|
||||
In versions before 0.5 the behavior documented here affected the now
|
||||
no longer existing `CacheControl` class.
|
||||
"""
|
||||
|
||||
no_cache = cache_control_property("no-cache", "*", None)
|
||||
no_store = cache_control_property("no-store", None, bool)
|
||||
max_age = cache_control_property("max-age", -1, int)
|
||||
no_transform = cache_control_property("no-transform", None, None)
|
||||
|
||||
def __init__(self, values=(), on_update=None):
|
||||
dict.__init__(self, values or ())
|
||||
self.on_update = on_update
|
||||
self.provided = values is not None
|
||||
|
||||
def _get_cache_value(self, key, empty, type):
|
||||
"""Used internally by the accessor properties."""
|
||||
if type is bool:
|
||||
return key in self
|
||||
if key in self:
|
||||
value = self[key]
|
||||
if value is None:
|
||||
return empty
|
||||
elif type is not None:
|
||||
try:
|
||||
value = type(value)
|
||||
except ValueError:
|
||||
pass
|
||||
return value
|
||||
return None
|
||||
|
||||
def _set_cache_value(self, key, value, type):
|
||||
"""Used internally by the accessor properties."""
|
||||
if type is bool:
|
||||
if value:
|
||||
self[key] = None
|
||||
else:
|
||||
self.pop(key, None)
|
||||
else:
|
||||
if value is None:
|
||||
self.pop(key, None)
|
||||
elif value is True:
|
||||
self[key] = None
|
||||
else:
|
||||
if type is not None:
|
||||
self[key] = type(value)
|
||||
else:
|
||||
self[key] = value
|
||||
|
||||
def _del_cache_value(self, key):
|
||||
"""Used internally by the accessor properties."""
|
||||
if key in self:
|
||||
del self[key]
|
||||
|
||||
def to_header(self):
|
||||
"""Convert the stored values into a cache control header."""
|
||||
return http.dump_header(self)
|
||||
|
||||
def __str__(self):
|
||||
return self.to_header()
|
||||
|
||||
def __repr__(self):
|
||||
kv_str = " ".join(f"{k}={v!r}" for k, v in sorted(self.items()))
|
||||
return f"<{type(self).__name__} {kv_str}>"
|
||||
|
||||
cache_property = staticmethod(cache_control_property)
|
||||
|
||||
|
||||
class RequestCacheControl(ImmutableDictMixin, _CacheControl):
|
||||
"""A cache control for requests. This is immutable and gives access
|
||||
to all the request-relevant cache control headers.
|
||||
|
||||
To get a header of the :class:`RequestCacheControl` object again you can
|
||||
convert the object into a string or call the :meth:`to_header` method. If
|
||||
you plan to subclass it and add your own items have a look at the sourcecode
|
||||
for that class.
|
||||
|
||||
.. versionchanged:: 2.1.0
|
||||
Setting int properties such as ``max_age`` will convert the
|
||||
value to an int.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
In previous versions a `CacheControl` class existed that was used
|
||||
both for request and response.
|
||||
"""
|
||||
|
||||
max_stale = cache_control_property("max-stale", "*", int)
|
||||
min_fresh = cache_control_property("min-fresh", "*", int)
|
||||
only_if_cached = cache_control_property("only-if-cached", None, bool)
|
||||
|
||||
|
||||
class ResponseCacheControl(_CacheControl):
|
||||
"""A cache control for responses. Unlike :class:`RequestCacheControl`
|
||||
this is mutable and gives access to response-relevant cache control
|
||||
headers.
|
||||
|
||||
To get a header of the :class:`ResponseCacheControl` object again you can
|
||||
convert the object into a string or call the :meth:`to_header` method. If
|
||||
you plan to subclass it and add your own items have a look at the sourcecode
|
||||
for that class.
|
||||
|
||||
.. versionchanged:: 2.1.1
|
||||
``s_maxage`` converts the value to an int.
|
||||
|
||||
.. versionchanged:: 2.1.0
|
||||
Setting int properties such as ``max_age`` will convert the
|
||||
value to an int.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
In previous versions a `CacheControl` class existed that was used
|
||||
both for request and response.
|
||||
"""
|
||||
|
||||
public = cache_control_property("public", None, bool)
|
||||
private = cache_control_property("private", "*", None)
|
||||
must_revalidate = cache_control_property("must-revalidate", None, bool)
|
||||
proxy_revalidate = cache_control_property("proxy-revalidate", None, bool)
|
||||
s_maxage = cache_control_property("s-maxage", None, int)
|
||||
immutable = cache_control_property("immutable", None, bool)
|
||||
|
||||
|
||||
# circular dependencies
|
||||
from .. import http
|
||||
@@ -0,0 +1,109 @@
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Mapping
|
||||
from typing import TypeVar
|
||||
|
||||
from .mixins import ImmutableDictMixin
|
||||
from .mixins import UpdateDictMixin
|
||||
|
||||
T = TypeVar("T")
|
||||
_CPT = TypeVar("_CPT", str, int, bool)
|
||||
_OptCPT = _CPT | None
|
||||
|
||||
def cache_control_property(key: str, empty: _OptCPT, type: type[_CPT]) -> property: ...
|
||||
|
||||
class _CacheControl(UpdateDictMixin[str, _OptCPT], dict[str, _OptCPT]):
|
||||
provided: bool
|
||||
def __init__(
|
||||
self,
|
||||
values: Mapping[str, _OptCPT] | Iterable[tuple[str, _OptCPT]] = (),
|
||||
on_update: Callable[[_CacheControl], None] | None = None,
|
||||
) -> None: ...
|
||||
@property
|
||||
def no_cache(self) -> bool | None: ...
|
||||
@no_cache.setter
|
||||
def no_cache(self, value: bool | None) -> None: ...
|
||||
@no_cache.deleter
|
||||
def no_cache(self) -> None: ...
|
||||
@property
|
||||
def no_store(self) -> bool | None: ...
|
||||
@no_store.setter
|
||||
def no_store(self, value: bool | None) -> None: ...
|
||||
@no_store.deleter
|
||||
def no_store(self) -> None: ...
|
||||
@property
|
||||
def max_age(self) -> int | None: ...
|
||||
@max_age.setter
|
||||
def max_age(self, value: int | None) -> None: ...
|
||||
@max_age.deleter
|
||||
def max_age(self) -> None: ...
|
||||
@property
|
||||
def no_transform(self) -> bool | None: ...
|
||||
@no_transform.setter
|
||||
def no_transform(self, value: bool | None) -> None: ...
|
||||
@no_transform.deleter
|
||||
def no_transform(self) -> None: ...
|
||||
def _get_cache_value(self, key: str, empty: T | None, type: type[T]) -> T: ...
|
||||
def _set_cache_value(self, key: str, value: T | None, type: type[T]) -> None: ...
|
||||
def _del_cache_value(self, key: str) -> None: ...
|
||||
def to_header(self) -> str: ...
|
||||
@staticmethod
|
||||
def cache_property(key: str, empty: _OptCPT, type: type[_CPT]) -> property: ...
|
||||
|
||||
class RequestCacheControl(ImmutableDictMixin[str, _OptCPT], _CacheControl):
|
||||
@property
|
||||
def max_stale(self) -> int | None: ...
|
||||
@max_stale.setter
|
||||
def max_stale(self, value: int | None) -> None: ...
|
||||
@max_stale.deleter
|
||||
def max_stale(self) -> None: ...
|
||||
@property
|
||||
def min_fresh(self) -> int | None: ...
|
||||
@min_fresh.setter
|
||||
def min_fresh(self, value: int | None) -> None: ...
|
||||
@min_fresh.deleter
|
||||
def min_fresh(self) -> None: ...
|
||||
@property
|
||||
def only_if_cached(self) -> bool | None: ...
|
||||
@only_if_cached.setter
|
||||
def only_if_cached(self, value: bool | None) -> None: ...
|
||||
@only_if_cached.deleter
|
||||
def only_if_cached(self) -> None: ...
|
||||
|
||||
class ResponseCacheControl(_CacheControl):
|
||||
@property
|
||||
def public(self) -> bool | None: ...
|
||||
@public.setter
|
||||
def public(self, value: bool | None) -> None: ...
|
||||
@public.deleter
|
||||
def public(self) -> None: ...
|
||||
@property
|
||||
def private(self) -> bool | None: ...
|
||||
@private.setter
|
||||
def private(self, value: bool | None) -> None: ...
|
||||
@private.deleter
|
||||
def private(self) -> None: ...
|
||||
@property
|
||||
def must_revalidate(self) -> bool | None: ...
|
||||
@must_revalidate.setter
|
||||
def must_revalidate(self, value: bool | None) -> None: ...
|
||||
@must_revalidate.deleter
|
||||
def must_revalidate(self) -> None: ...
|
||||
@property
|
||||
def proxy_revalidate(self) -> bool | None: ...
|
||||
@proxy_revalidate.setter
|
||||
def proxy_revalidate(self, value: bool | None) -> None: ...
|
||||
@proxy_revalidate.deleter
|
||||
def proxy_revalidate(self) -> None: ...
|
||||
@property
|
||||
def s_maxage(self) -> int | None: ...
|
||||
@s_maxage.setter
|
||||
def s_maxage(self, value: int | None) -> None: ...
|
||||
@s_maxage.deleter
|
||||
def s_maxage(self) -> None: ...
|
||||
@property
|
||||
def immutable(self) -> bool | None: ...
|
||||
@immutable.setter
|
||||
def immutable(self, value: bool | None) -> None: ...
|
||||
@immutable.deleter
|
||||
def immutable(self) -> None: ...
|
||||
@@ -0,0 +1,94 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from .mixins import UpdateDictMixin
|
||||
|
||||
|
||||
def csp_property(key):
|
||||
"""Return a new property object for a content security policy header.
|
||||
Useful if you want to add support for a csp extension in a
|
||||
subclass.
|
||||
"""
|
||||
return property(
|
||||
lambda x: x._get_value(key),
|
||||
lambda x, v: x._set_value(key, v),
|
||||
lambda x: x._del_value(key),
|
||||
f"accessor for {key!r}",
|
||||
)
|
||||
|
||||
|
||||
class ContentSecurityPolicy(UpdateDictMixin, dict):
|
||||
"""Subclass of a dict that stores values for a Content Security Policy
|
||||
header. It has accessors for all the level 3 policies.
|
||||
|
||||
Because the csp directives in the HTTP header use dashes the
|
||||
python descriptors use underscores for that.
|
||||
|
||||
To get a header of the :class:`ContentSecuirtyPolicy` object again
|
||||
you can convert the object into a string or call the
|
||||
:meth:`to_header` method. If you plan to subclass it and add your
|
||||
own items have a look at the sourcecode for that class.
|
||||
|
||||
.. versionadded:: 1.0.0
|
||||
Support for Content Security Policy headers was added.
|
||||
|
||||
"""
|
||||
|
||||
base_uri = csp_property("base-uri")
|
||||
child_src = csp_property("child-src")
|
||||
connect_src = csp_property("connect-src")
|
||||
default_src = csp_property("default-src")
|
||||
font_src = csp_property("font-src")
|
||||
form_action = csp_property("form-action")
|
||||
frame_ancestors = csp_property("frame-ancestors")
|
||||
frame_src = csp_property("frame-src")
|
||||
img_src = csp_property("img-src")
|
||||
manifest_src = csp_property("manifest-src")
|
||||
media_src = csp_property("media-src")
|
||||
navigate_to = csp_property("navigate-to")
|
||||
object_src = csp_property("object-src")
|
||||
prefetch_src = csp_property("prefetch-src")
|
||||
plugin_types = csp_property("plugin-types")
|
||||
report_to = csp_property("report-to")
|
||||
report_uri = csp_property("report-uri")
|
||||
sandbox = csp_property("sandbox")
|
||||
script_src = csp_property("script-src")
|
||||
script_src_attr = csp_property("script-src-attr")
|
||||
script_src_elem = csp_property("script-src-elem")
|
||||
style_src = csp_property("style-src")
|
||||
style_src_attr = csp_property("style-src-attr")
|
||||
style_src_elem = csp_property("style-src-elem")
|
||||
worker_src = csp_property("worker-src")
|
||||
|
||||
def __init__(self, values=(), on_update=None):
|
||||
dict.__init__(self, values or ())
|
||||
self.on_update = on_update
|
||||
self.provided = values is not None
|
||||
|
||||
def _get_value(self, key):
|
||||
"""Used internally by the accessor properties."""
|
||||
return self.get(key)
|
||||
|
||||
def _set_value(self, key, value):
|
||||
"""Used internally by the accessor properties."""
|
||||
if value is None:
|
||||
self.pop(key, None)
|
||||
else:
|
||||
self[key] = value
|
||||
|
||||
def _del_value(self, key):
|
||||
"""Used internally by the accessor properties."""
|
||||
if key in self:
|
||||
del self[key]
|
||||
|
||||
def to_header(self):
|
||||
"""Convert the stored values into a cache control header."""
|
||||
from ..http import dump_csp_header
|
||||
|
||||
return dump_csp_header(self)
|
||||
|
||||
def __str__(self):
|
||||
return self.to_header()
|
||||
|
||||
def __repr__(self):
|
||||
kv_str = " ".join(f"{k}={v!r}" for k, v in sorted(self.items()))
|
||||
return f"<{type(self).__name__} {kv_str}>"
|
||||
@@ -0,0 +1,169 @@
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Mapping
|
||||
|
||||
from .mixins import UpdateDictMixin
|
||||
|
||||
def csp_property(key: str) -> property: ...
|
||||
|
||||
class ContentSecurityPolicy(UpdateDictMixin[str, str], dict[str, str]):
|
||||
@property
|
||||
def base_uri(self) -> str | None: ...
|
||||
@base_uri.setter
|
||||
def base_uri(self, value: str | None) -> None: ...
|
||||
@base_uri.deleter
|
||||
def base_uri(self) -> None: ...
|
||||
@property
|
||||
def child_src(self) -> str | None: ...
|
||||
@child_src.setter
|
||||
def child_src(self, value: str | None) -> None: ...
|
||||
@child_src.deleter
|
||||
def child_src(self) -> None: ...
|
||||
@property
|
||||
def connect_src(self) -> str | None: ...
|
||||
@connect_src.setter
|
||||
def connect_src(self, value: str | None) -> None: ...
|
||||
@connect_src.deleter
|
||||
def connect_src(self) -> None: ...
|
||||
@property
|
||||
def default_src(self) -> str | None: ...
|
||||
@default_src.setter
|
||||
def default_src(self, value: str | None) -> None: ...
|
||||
@default_src.deleter
|
||||
def default_src(self) -> None: ...
|
||||
@property
|
||||
def font_src(self) -> str | None: ...
|
||||
@font_src.setter
|
||||
def font_src(self, value: str | None) -> None: ...
|
||||
@font_src.deleter
|
||||
def font_src(self) -> None: ...
|
||||
@property
|
||||
def form_action(self) -> str | None: ...
|
||||
@form_action.setter
|
||||
def form_action(self, value: str | None) -> None: ...
|
||||
@form_action.deleter
|
||||
def form_action(self) -> None: ...
|
||||
@property
|
||||
def frame_ancestors(self) -> str | None: ...
|
||||
@frame_ancestors.setter
|
||||
def frame_ancestors(self, value: str | None) -> None: ...
|
||||
@frame_ancestors.deleter
|
||||
def frame_ancestors(self) -> None: ...
|
||||
@property
|
||||
def frame_src(self) -> str | None: ...
|
||||
@frame_src.setter
|
||||
def frame_src(self, value: str | None) -> None: ...
|
||||
@frame_src.deleter
|
||||
def frame_src(self) -> None: ...
|
||||
@property
|
||||
def img_src(self) -> str | None: ...
|
||||
@img_src.setter
|
||||
def img_src(self, value: str | None) -> None: ...
|
||||
@img_src.deleter
|
||||
def img_src(self) -> None: ...
|
||||
@property
|
||||
def manifest_src(self) -> str | None: ...
|
||||
@manifest_src.setter
|
||||
def manifest_src(self, value: str | None) -> None: ...
|
||||
@manifest_src.deleter
|
||||
def manifest_src(self) -> None: ...
|
||||
@property
|
||||
def media_src(self) -> str | None: ...
|
||||
@media_src.setter
|
||||
def media_src(self, value: str | None) -> None: ...
|
||||
@media_src.deleter
|
||||
def media_src(self) -> None: ...
|
||||
@property
|
||||
def navigate_to(self) -> str | None: ...
|
||||
@navigate_to.setter
|
||||
def navigate_to(self, value: str | None) -> None: ...
|
||||
@navigate_to.deleter
|
||||
def navigate_to(self) -> None: ...
|
||||
@property
|
||||
def object_src(self) -> str | None: ...
|
||||
@object_src.setter
|
||||
def object_src(self, value: str | None) -> None: ...
|
||||
@object_src.deleter
|
||||
def object_src(self) -> None: ...
|
||||
@property
|
||||
def prefetch_src(self) -> str | None: ...
|
||||
@prefetch_src.setter
|
||||
def prefetch_src(self, value: str | None) -> None: ...
|
||||
@prefetch_src.deleter
|
||||
def prefetch_src(self) -> None: ...
|
||||
@property
|
||||
def plugin_types(self) -> str | None: ...
|
||||
@plugin_types.setter
|
||||
def plugin_types(self, value: str | None) -> None: ...
|
||||
@plugin_types.deleter
|
||||
def plugin_types(self) -> None: ...
|
||||
@property
|
||||
def report_to(self) -> str | None: ...
|
||||
@report_to.setter
|
||||
def report_to(self, value: str | None) -> None: ...
|
||||
@report_to.deleter
|
||||
def report_to(self) -> None: ...
|
||||
@property
|
||||
def report_uri(self) -> str | None: ...
|
||||
@report_uri.setter
|
||||
def report_uri(self, value: str | None) -> None: ...
|
||||
@report_uri.deleter
|
||||
def report_uri(self) -> None: ...
|
||||
@property
|
||||
def sandbox(self) -> str | None: ...
|
||||
@sandbox.setter
|
||||
def sandbox(self, value: str | None) -> None: ...
|
||||
@sandbox.deleter
|
||||
def sandbox(self) -> None: ...
|
||||
@property
|
||||
def script_src(self) -> str | None: ...
|
||||
@script_src.setter
|
||||
def script_src(self, value: str | None) -> None: ...
|
||||
@script_src.deleter
|
||||
def script_src(self) -> None: ...
|
||||
@property
|
||||
def script_src_attr(self) -> str | None: ...
|
||||
@script_src_attr.setter
|
||||
def script_src_attr(self, value: str | None) -> None: ...
|
||||
@script_src_attr.deleter
|
||||
def script_src_attr(self) -> None: ...
|
||||
@property
|
||||
def script_src_elem(self) -> str | None: ...
|
||||
@script_src_elem.setter
|
||||
def script_src_elem(self, value: str | None) -> None: ...
|
||||
@script_src_elem.deleter
|
||||
def script_src_elem(self) -> None: ...
|
||||
@property
|
||||
def style_src(self) -> str | None: ...
|
||||
@style_src.setter
|
||||
def style_src(self, value: str | None) -> None: ...
|
||||
@style_src.deleter
|
||||
def style_src(self) -> None: ...
|
||||
@property
|
||||
def style_src_attr(self) -> str | None: ...
|
||||
@style_src_attr.setter
|
||||
def style_src_attr(self, value: str | None) -> None: ...
|
||||
@style_src_attr.deleter
|
||||
def style_src_attr(self) -> None: ...
|
||||
@property
|
||||
def style_src_elem(self) -> str | None: ...
|
||||
@style_src_elem.setter
|
||||
def style_src_elem(self, value: str | None) -> None: ...
|
||||
@style_src_elem.deleter
|
||||
def style_src_elem(self) -> None: ...
|
||||
@property
|
||||
def worker_src(self) -> str | None: ...
|
||||
@worker_src.setter
|
||||
def worker_src(self, value: str | None) -> None: ...
|
||||
@worker_src.deleter
|
||||
def worker_src(self) -> None: ...
|
||||
provided: bool
|
||||
def __init__(
|
||||
self,
|
||||
values: Mapping[str, str] | Iterable[tuple[str, str]] = (),
|
||||
on_update: Callable[[ContentSecurityPolicy], None] | None = None,
|
||||
) -> None: ...
|
||||
def _get_value(self, key: str) -> str | None: ...
|
||||
def _set_value(self, key: str, value: str) -> None: ...
|
||||
def _del_value(self, key: str) -> None: ...
|
||||
def to_header(self) -> str: ...
|
||||
@@ -0,0 +1,95 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Collection
|
||||
|
||||
|
||||
class ETags(Collection):
|
||||
"""A set that can be used to check if one etag is present in a collection
|
||||
of etags.
|
||||
"""
|
||||
|
||||
def __init__(self, strong_etags=None, weak_etags=None, star_tag=False):
|
||||
if not star_tag and strong_etags:
|
||||
self._strong = frozenset(strong_etags)
|
||||
else:
|
||||
self._strong = frozenset()
|
||||
|
||||
self._weak = frozenset(weak_etags or ())
|
||||
self.star_tag = star_tag
|
||||
|
||||
def as_set(self, include_weak=False):
|
||||
"""Convert the `ETags` object into a python set. Per default all the
|
||||
weak etags are not part of this set."""
|
||||
rv = set(self._strong)
|
||||
if include_weak:
|
||||
rv.update(self._weak)
|
||||
return rv
|
||||
|
||||
def is_weak(self, etag):
|
||||
"""Check if an etag is weak."""
|
||||
return etag in self._weak
|
||||
|
||||
def is_strong(self, etag):
|
||||
"""Check if an etag is strong."""
|
||||
return etag in self._strong
|
||||
|
||||
def contains_weak(self, etag):
|
||||
"""Check if an etag is part of the set including weak and strong tags."""
|
||||
return self.is_weak(etag) or self.contains(etag)
|
||||
|
||||
def contains(self, etag):
|
||||
"""Check if an etag is part of the set ignoring weak tags.
|
||||
It is also possible to use the ``in`` operator.
|
||||
"""
|
||||
if self.star_tag:
|
||||
return True
|
||||
return self.is_strong(etag)
|
||||
|
||||
def contains_raw(self, etag):
|
||||
"""When passed a quoted tag it will check if this tag is part of the
|
||||
set. If the tag is weak it is checked against weak and strong tags,
|
||||
otherwise strong only."""
|
||||
from ..http import unquote_etag
|
||||
|
||||
etag, weak = unquote_etag(etag)
|
||||
if weak:
|
||||
return self.contains_weak(etag)
|
||||
return self.contains(etag)
|
||||
|
||||
def to_header(self):
|
||||
"""Convert the etags set into a HTTP header string."""
|
||||
if self.star_tag:
|
||||
return "*"
|
||||
return ", ".join(
|
||||
[f'"{x}"' for x in self._strong] + [f'W/"{x}"' for x in self._weak]
|
||||
)
|
||||
|
||||
def __call__(self, etag=None, data=None, include_weak=False):
|
||||
if [etag, data].count(None) != 1:
|
||||
raise TypeError("either tag or data required, but at least one")
|
||||
if etag is None:
|
||||
from ..http import generate_etag
|
||||
|
||||
etag = generate_etag(data)
|
||||
if include_weak:
|
||||
if etag in self._weak:
|
||||
return True
|
||||
return etag in self._strong
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self.star_tag or self._strong or self._weak)
|
||||
|
||||
def __str__(self):
|
||||
return self.to_header()
|
||||
|
||||
def __len__(self):
|
||||
return len(self._strong)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._strong)
|
||||
|
||||
def __contains__(self, etag):
|
||||
return self.contains(etag)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{type(self).__name__} {str(self)!r}>"
|
||||
@@ -0,0 +1,30 @@
|
||||
from collections.abc import Collection
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Iterator
|
||||
|
||||
class ETags(Collection[str]):
|
||||
_strong: frozenset[str]
|
||||
_weak: frozenset[str]
|
||||
star_tag: bool
|
||||
def __init__(
|
||||
self,
|
||||
strong_etags: Iterable[str] | None = None,
|
||||
weak_etags: Iterable[str] | None = None,
|
||||
star_tag: bool = False,
|
||||
) -> None: ...
|
||||
def as_set(self, include_weak: bool = False) -> set[str]: ...
|
||||
def is_weak(self, etag: str) -> bool: ...
|
||||
def is_strong(self, etag: str) -> bool: ...
|
||||
def contains_weak(self, etag: str) -> bool: ...
|
||||
def contains(self, etag: str) -> bool: ...
|
||||
def contains_raw(self, etag: str) -> bool: ...
|
||||
def to_header(self) -> str: ...
|
||||
def __call__(
|
||||
self,
|
||||
etag: str | None = None,
|
||||
data: bytes | None = None,
|
||||
include_weak: bool = False,
|
||||
) -> bool: ...
|
||||
def __len__(self) -> int: ...
|
||||
def __iter__(self) -> Iterator[str]: ...
|
||||
def __contains__(self, item: str) -> bool: ... # type: ignore
|
||||
@@ -0,0 +1,196 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import mimetypes
|
||||
from io import BytesIO
|
||||
from os import fsdecode
|
||||
from os import fspath
|
||||
|
||||
from .._internal import _plain_int
|
||||
from .structures import MultiDict
|
||||
|
||||
|
||||
class FileStorage:
|
||||
"""The :class:`FileStorage` class is a thin wrapper over incoming files.
|
||||
It is used by the request object to represent uploaded files. All the
|
||||
attributes of the wrapper stream are proxied by the file storage so
|
||||
it's possible to do ``storage.read()`` instead of the long form
|
||||
``storage.stream.read()``.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
stream=None,
|
||||
filename=None,
|
||||
name=None,
|
||||
content_type=None,
|
||||
content_length=None,
|
||||
headers=None,
|
||||
):
|
||||
self.name = name
|
||||
self.stream = stream or BytesIO()
|
||||
|
||||
# If no filename is provided, attempt to get the filename from
|
||||
# the stream object. Python names special streams like
|
||||
# ``<stderr>`` with angular brackets, skip these streams.
|
||||
if filename is None:
|
||||
filename = getattr(stream, "name", None)
|
||||
|
||||
if filename is not None:
|
||||
filename = fsdecode(filename)
|
||||
|
||||
if filename and filename[0] == "<" and filename[-1] == ">":
|
||||
filename = None
|
||||
else:
|
||||
filename = fsdecode(filename)
|
||||
|
||||
self.filename = filename
|
||||
|
||||
if headers is None:
|
||||
from .headers import Headers
|
||||
|
||||
headers = Headers()
|
||||
self.headers = headers
|
||||
if content_type is not None:
|
||||
headers["Content-Type"] = content_type
|
||||
if content_length is not None:
|
||||
headers["Content-Length"] = str(content_length)
|
||||
|
||||
def _parse_content_type(self):
|
||||
if not hasattr(self, "_parsed_content_type"):
|
||||
self._parsed_content_type = http.parse_options_header(self.content_type)
|
||||
|
||||
@property
|
||||
def content_type(self):
|
||||
"""The content-type sent in the header. Usually not available"""
|
||||
return self.headers.get("content-type")
|
||||
|
||||
@property
|
||||
def content_length(self):
|
||||
"""The content-length sent in the header. Usually not available"""
|
||||
if "content-length" in self.headers:
|
||||
try:
|
||||
return _plain_int(self.headers["content-length"])
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return 0
|
||||
|
||||
@property
|
||||
def mimetype(self):
|
||||
"""Like :attr:`content_type`, but without parameters (eg, without
|
||||
charset, type etc.) and always lowercase. For example if the content
|
||||
type is ``text/HTML; charset=utf-8`` the mimetype would be
|
||||
``'text/html'``.
|
||||
|
||||
.. versionadded:: 0.7
|
||||
"""
|
||||
self._parse_content_type()
|
||||
return self._parsed_content_type[0].lower()
|
||||
|
||||
@property
|
||||
def mimetype_params(self):
|
||||
"""The mimetype parameters as dict. For example if the content
|
||||
type is ``text/html; charset=utf-8`` the params would be
|
||||
``{'charset': 'utf-8'}``.
|
||||
|
||||
.. versionadded:: 0.7
|
||||
"""
|
||||
self._parse_content_type()
|
||||
return self._parsed_content_type[1]
|
||||
|
||||
def save(self, dst, buffer_size=16384):
|
||||
"""Save the file to a destination path or file object. If the
|
||||
destination is a file object you have to close it yourself after the
|
||||
call. The buffer size is the number of bytes held in memory during
|
||||
the copy process. It defaults to 16KB.
|
||||
|
||||
For secure file saving also have a look at :func:`secure_filename`.
|
||||
|
||||
:param dst: a filename, :class:`os.PathLike`, or open file
|
||||
object to write to.
|
||||
:param buffer_size: Passed as the ``length`` parameter of
|
||||
:func:`shutil.copyfileobj`.
|
||||
|
||||
.. versionchanged:: 1.0
|
||||
Supports :mod:`pathlib`.
|
||||
"""
|
||||
from shutil import copyfileobj
|
||||
|
||||
close_dst = False
|
||||
|
||||
if hasattr(dst, "__fspath__"):
|
||||
dst = fspath(dst)
|
||||
|
||||
if isinstance(dst, str):
|
||||
dst = open(dst, "wb")
|
||||
close_dst = True
|
||||
|
||||
try:
|
||||
copyfileobj(self.stream, dst, buffer_size)
|
||||
finally:
|
||||
if close_dst:
|
||||
dst.close()
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying file if possible."""
|
||||
try:
|
||||
self.stream.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self.filename)
|
||||
|
||||
def __getattr__(self, name):
|
||||
try:
|
||||
return getattr(self.stream, name)
|
||||
except AttributeError:
|
||||
# SpooledTemporaryFile doesn't implement IOBase, get the
|
||||
# attribute from its backing file instead.
|
||||
# https://github.com/python/cpython/pull/3249
|
||||
if hasattr(self.stream, "_file"):
|
||||
return getattr(self.stream._file, name)
|
||||
raise
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.stream)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
|
||||
|
||||
|
||||
class FileMultiDict(MultiDict):
|
||||
"""A special :class:`MultiDict` that has convenience methods to add
|
||||
files to it. This is used for :class:`EnvironBuilder` and generally
|
||||
useful for unittesting.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
"""
|
||||
|
||||
def add_file(self, name, file, filename=None, content_type=None):
|
||||
"""Adds a new file to the dict. `file` can be a file name or
|
||||
a :class:`file`-like or a :class:`FileStorage` object.
|
||||
|
||||
:param name: the name of the field.
|
||||
:param file: a filename or :class:`file`-like object
|
||||
:param filename: an optional filename
|
||||
:param content_type: an optional content type
|
||||
"""
|
||||
if isinstance(file, FileStorage):
|
||||
value = file
|
||||
else:
|
||||
if isinstance(file, str):
|
||||
if filename is None:
|
||||
filename = file
|
||||
file = open(file, "rb")
|
||||
if filename and content_type is None:
|
||||
content_type = (
|
||||
mimetypes.guess_type(filename)[0] or "application/octet-stream"
|
||||
)
|
||||
value = FileStorage(file, filename, name, content_type)
|
||||
|
||||
self.add(name, value)
|
||||
|
||||
|
||||
# circular dependencies
|
||||
from .. import http
|
||||
@@ -0,0 +1,47 @@
|
||||
from collections.abc import Iterator
|
||||
from os import PathLike
|
||||
from typing import Any
|
||||
from typing import IO
|
||||
|
||||
from .headers import Headers
|
||||
from .structures import MultiDict
|
||||
|
||||
class FileStorage:
|
||||
name: str | None
|
||||
stream: IO[bytes]
|
||||
filename: str | None
|
||||
headers: Headers
|
||||
_parsed_content_type: tuple[str, dict[str, str]]
|
||||
def __init__(
|
||||
self,
|
||||
stream: IO[bytes] | None = None,
|
||||
filename: str | PathLike | None = None,
|
||||
name: str | None = None,
|
||||
content_type: str | None = None,
|
||||
content_length: int | None = None,
|
||||
headers: Headers | None = None,
|
||||
) -> None: ...
|
||||
def _parse_content_type(self) -> None: ...
|
||||
@property
|
||||
def content_type(self) -> str: ...
|
||||
@property
|
||||
def content_length(self) -> int: ...
|
||||
@property
|
||||
def mimetype(self) -> str: ...
|
||||
@property
|
||||
def mimetype_params(self) -> dict[str, str]: ...
|
||||
def save(self, dst: str | PathLike | IO[bytes], buffer_size: int = ...) -> None: ...
|
||||
def close(self) -> None: ...
|
||||
def __bool__(self) -> bool: ...
|
||||
def __getattr__(self, name: str) -> Any: ...
|
||||
def __iter__(self) -> Iterator[bytes]: ...
|
||||
def __repr__(self) -> str: ...
|
||||
|
||||
class FileMultiDict(MultiDict[str, FileStorage]):
|
||||
def add_file(
|
||||
self,
|
||||
name: str,
|
||||
file: FileStorage | str | IO[bytes],
|
||||
filename: str | None = None,
|
||||
content_type: str | None = None,
|
||||
) -> None: ...
|
||||
@@ -0,0 +1,515 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import typing as t
|
||||
|
||||
from .._internal import _missing
|
||||
from ..exceptions import BadRequestKeyError
|
||||
from .mixins import ImmutableHeadersMixin
|
||||
from .structures import iter_multi_items
|
||||
from .structures import MultiDict
|
||||
|
||||
|
||||
class Headers:
|
||||
"""An object that stores some headers. It has a dict-like interface,
|
||||
but is ordered, can store the same key multiple times, and iterating
|
||||
yields ``(key, value)`` pairs instead of only keys.
|
||||
|
||||
This data structure is useful if you want a nicer way to handle WSGI
|
||||
headers which are stored as tuples in a list.
|
||||
|
||||
From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
|
||||
also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
|
||||
and will render a page for a ``400 BAD REQUEST`` if caught in a
|
||||
catch-all for HTTP exceptions.
|
||||
|
||||
Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
|
||||
class, with the exception of `__getitem__`. :mod:`wsgiref` will return
|
||||
`None` for ``headers['missing']``, whereas :class:`Headers` will raise
|
||||
a :class:`KeyError`.
|
||||
|
||||
To create a new ``Headers`` object, pass it a list, dict, or
|
||||
other ``Headers`` object with default values. These values are
|
||||
validated the same way values added later are.
|
||||
|
||||
:param defaults: The list of default values for the :class:`Headers`.
|
||||
|
||||
.. versionchanged:: 2.1.0
|
||||
Default values are validated the same as values added later.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
This data structure now stores unicode values similar to how the
|
||||
multi dicts do it. The main difference is that bytes can be set as
|
||||
well which will automatically be latin1 decoded.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
The :meth:`linked` function was removed without replacement as it
|
||||
was an API that does not support the changes to the encoding model.
|
||||
"""
|
||||
|
||||
def __init__(self, defaults=None):
|
||||
self._list = []
|
||||
if defaults is not None:
|
||||
self.extend(defaults)
|
||||
|
||||
def __getitem__(self, key, _get_mode=False):
|
||||
if not _get_mode:
|
||||
if isinstance(key, int):
|
||||
return self._list[key]
|
||||
elif isinstance(key, slice):
|
||||
return self.__class__(self._list[key])
|
||||
if not isinstance(key, str):
|
||||
raise BadRequestKeyError(key)
|
||||
ikey = key.lower()
|
||||
for k, v in self._list:
|
||||
if k.lower() == ikey:
|
||||
return v
|
||||
# micro optimization: if we are in get mode we will catch that
|
||||
# exception one stack level down so we can raise a standard
|
||||
# key error instead of our special one.
|
||||
if _get_mode:
|
||||
raise KeyError()
|
||||
raise BadRequestKeyError(key)
|
||||
|
||||
def __eq__(self, other):
|
||||
def lowered(item):
|
||||
return (item[0].lower(),) + item[1:]
|
||||
|
||||
return other.__class__ is self.__class__ and set(
|
||||
map(lowered, other._list)
|
||||
) == set(map(lowered, self._list))
|
||||
|
||||
__hash__ = None
|
||||
|
||||
def get(self, key, default=None, type=None):
|
||||
"""Return the default value if the requested data doesn't exist.
|
||||
If `type` is provided and is a callable it should convert the value,
|
||||
return it or raise a :exc:`ValueError` if that is not possible. In
|
||||
this case the function will return the default as if the value was not
|
||||
found:
|
||||
|
||||
>>> d = Headers([('Content-Length', '42')])
|
||||
>>> d.get('Content-Length', type=int)
|
||||
42
|
||||
|
||||
:param key: The key to be looked up.
|
||||
:param default: The default value to be returned if the key can't
|
||||
be looked up. If not further specified `None` is
|
||||
returned.
|
||||
:param type: A callable that is used to cast the value in the
|
||||
:class:`Headers`. If a :exc:`ValueError` is raised
|
||||
by this callable the default value is returned.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
The ``as_bytes`` parameter was removed.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
The ``as_bytes`` parameter was added.
|
||||
"""
|
||||
try:
|
||||
rv = self.__getitem__(key, _get_mode=True)
|
||||
except KeyError:
|
||||
return default
|
||||
if type is None:
|
||||
return rv
|
||||
try:
|
||||
return type(rv)
|
||||
except ValueError:
|
||||
return default
|
||||
|
||||
def getlist(self, key, type=None):
|
||||
"""Return the list of items for a given key. If that key is not in the
|
||||
:class:`Headers`, the return value will be an empty list. Just like
|
||||
:meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will
|
||||
be converted with the callable defined there.
|
||||
|
||||
:param key: The key to be looked up.
|
||||
:param type: A callable that is used to cast the value in the
|
||||
:class:`Headers`. If a :exc:`ValueError` is raised
|
||||
by this callable the value will be removed from the list.
|
||||
:return: a :class:`list` of all the values for the key.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
The ``as_bytes`` parameter was removed.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
The ``as_bytes`` parameter was added.
|
||||
"""
|
||||
ikey = key.lower()
|
||||
result = []
|
||||
for k, v in self:
|
||||
if k.lower() == ikey:
|
||||
if type is not None:
|
||||
try:
|
||||
v = type(v)
|
||||
except ValueError:
|
||||
continue
|
||||
result.append(v)
|
||||
return result
|
||||
|
||||
def get_all(self, name):
|
||||
"""Return a list of all the values for the named field.
|
||||
|
||||
This method is compatible with the :mod:`wsgiref`
|
||||
:meth:`~wsgiref.headers.Headers.get_all` method.
|
||||
"""
|
||||
return self.getlist(name)
|
||||
|
||||
def items(self, lower=False):
|
||||
for key, value in self:
|
||||
if lower:
|
||||
key = key.lower()
|
||||
yield key, value
|
||||
|
||||
def keys(self, lower=False):
|
||||
for key, _ in self.items(lower):
|
||||
yield key
|
||||
|
||||
def values(self):
|
||||
for _, value in self.items():
|
||||
yield value
|
||||
|
||||
def extend(self, *args, **kwargs):
|
||||
"""Extend headers in this object with items from another object
|
||||
containing header items as well as keyword arguments.
|
||||
|
||||
To replace existing keys instead of extending, use
|
||||
:meth:`update` instead.
|
||||
|
||||
If provided, the first argument can be another :class:`Headers`
|
||||
object, a :class:`MultiDict`, :class:`dict`, or iterable of
|
||||
pairs.
|
||||
|
||||
.. versionchanged:: 1.0
|
||||
Support :class:`MultiDict`. Allow passing ``kwargs``.
|
||||
"""
|
||||
if len(args) > 1:
|
||||
raise TypeError(f"update expected at most 1 arguments, got {len(args)}")
|
||||
|
||||
if args:
|
||||
for key, value in iter_multi_items(args[0]):
|
||||
self.add(key, value)
|
||||
|
||||
for key, value in iter_multi_items(kwargs):
|
||||
self.add(key, value)
|
||||
|
||||
def __delitem__(self, key, _index_operation=True):
|
||||
if _index_operation and isinstance(key, (int, slice)):
|
||||
del self._list[key]
|
||||
return
|
||||
key = key.lower()
|
||||
new = []
|
||||
for k, v in self._list:
|
||||
if k.lower() != key:
|
||||
new.append((k, v))
|
||||
self._list[:] = new
|
||||
|
||||
def remove(self, key):
|
||||
"""Remove a key.
|
||||
|
||||
:param key: The key to be removed.
|
||||
"""
|
||||
return self.__delitem__(key, _index_operation=False)
|
||||
|
||||
def pop(self, key=None, default=_missing):
|
||||
"""Removes and returns a key or index.
|
||||
|
||||
:param key: The key to be popped. If this is an integer the item at
|
||||
that position is removed, if it's a string the value for
|
||||
that key is. If the key is omitted or `None` the last
|
||||
item is removed.
|
||||
:return: an item.
|
||||
"""
|
||||
if key is None:
|
||||
return self._list.pop()
|
||||
if isinstance(key, int):
|
||||
return self._list.pop(key)
|
||||
try:
|
||||
rv = self[key]
|
||||
self.remove(key)
|
||||
except KeyError:
|
||||
if default is not _missing:
|
||||
return default
|
||||
raise
|
||||
return rv
|
||||
|
||||
def popitem(self):
|
||||
"""Removes a key or index and returns a (key, value) item."""
|
||||
return self.pop()
|
||||
|
||||
def __contains__(self, key):
|
||||
"""Check if a key is present."""
|
||||
try:
|
||||
self.__getitem__(key, _get_mode=True)
|
||||
except KeyError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def __iter__(self):
|
||||
"""Yield ``(key, value)`` tuples."""
|
||||
return iter(self._list)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._list)
|
||||
|
||||
def add(self, _key, _value, **kw):
|
||||
"""Add a new header tuple to the list.
|
||||
|
||||
Keyword arguments can specify additional parameters for the header
|
||||
value, with underscores converted to dashes::
|
||||
|
||||
>>> d = Headers()
|
||||
>>> d.add('Content-Type', 'text/plain')
|
||||
>>> d.add('Content-Disposition', 'attachment', filename='foo.png')
|
||||
|
||||
The keyword argument dumping uses :func:`dump_options_header`
|
||||
behind the scenes.
|
||||
|
||||
.. versionadded:: 0.4.1
|
||||
keyword arguments were added for :mod:`wsgiref` compatibility.
|
||||
"""
|
||||
if kw:
|
||||
_value = _options_header_vkw(_value, kw)
|
||||
_value = _str_header_value(_value)
|
||||
self._list.append((_key, _value))
|
||||
|
||||
def add_header(self, _key, _value, **_kw):
|
||||
"""Add a new header tuple to the list.
|
||||
|
||||
An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
|
||||
:meth:`~wsgiref.headers.Headers.add_header` method.
|
||||
"""
|
||||
self.add(_key, _value, **_kw)
|
||||
|
||||
def clear(self):
|
||||
"""Clears all headers."""
|
||||
del self._list[:]
|
||||
|
||||
def set(self, _key, _value, **kw):
|
||||
"""Remove all header tuples for `key` and add a new one. The newly
|
||||
added key either appears at the end of the list if there was no
|
||||
entry or replaces the first one.
|
||||
|
||||
Keyword arguments can specify additional parameters for the header
|
||||
value, with underscores converted to dashes. See :meth:`add` for
|
||||
more information.
|
||||
|
||||
.. versionchanged:: 0.6.1
|
||||
:meth:`set` now accepts the same arguments as :meth:`add`.
|
||||
|
||||
:param key: The key to be inserted.
|
||||
:param value: The value to be inserted.
|
||||
"""
|
||||
if kw:
|
||||
_value = _options_header_vkw(_value, kw)
|
||||
_value = _str_header_value(_value)
|
||||
if not self._list:
|
||||
self._list.append((_key, _value))
|
||||
return
|
||||
listiter = iter(self._list)
|
||||
ikey = _key.lower()
|
||||
for idx, (old_key, _old_value) in enumerate(listiter):
|
||||
if old_key.lower() == ikey:
|
||||
# replace first occurrence
|
||||
self._list[idx] = (_key, _value)
|
||||
break
|
||||
else:
|
||||
self._list.append((_key, _value))
|
||||
return
|
||||
self._list[idx + 1 :] = [t for t in listiter if t[0].lower() != ikey]
|
||||
|
||||
def setlist(self, key, values):
|
||||
"""Remove any existing values for a header and add new ones.
|
||||
|
||||
:param key: The header key to set.
|
||||
:param values: An iterable of values to set for the key.
|
||||
|
||||
.. versionadded:: 1.0
|
||||
"""
|
||||
if values:
|
||||
values_iter = iter(values)
|
||||
self.set(key, next(values_iter))
|
||||
|
||||
for value in values_iter:
|
||||
self.add(key, value)
|
||||
else:
|
||||
self.remove(key)
|
||||
|
||||
def setdefault(self, key, default):
|
||||
"""Return the first value for the key if it is in the headers,
|
||||
otherwise set the header to the value given by ``default`` and
|
||||
return that.
|
||||
|
||||
:param key: The header key to get.
|
||||
:param default: The value to set for the key if it is not in the
|
||||
headers.
|
||||
"""
|
||||
if key in self:
|
||||
return self[key]
|
||||
|
||||
self.set(key, default)
|
||||
return default
|
||||
|
||||
def setlistdefault(self, key, default):
|
||||
"""Return the list of values for the key if it is in the
|
||||
headers, otherwise set the header to the list of values given
|
||||
by ``default`` and return that.
|
||||
|
||||
Unlike :meth:`MultiDict.setlistdefault`, modifying the returned
|
||||
list will not affect the headers.
|
||||
|
||||
:param key: The header key to get.
|
||||
:param default: An iterable of values to set for the key if it
|
||||
is not in the headers.
|
||||
|
||||
.. versionadded:: 1.0
|
||||
"""
|
||||
if key not in self:
|
||||
self.setlist(key, default)
|
||||
|
||||
return self.getlist(key)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
"""Like :meth:`set` but also supports index/slice based setting."""
|
||||
if isinstance(key, (slice, int)):
|
||||
if isinstance(key, int):
|
||||
value = [value]
|
||||
value = [(k, _str_header_value(v)) for (k, v) in value]
|
||||
if isinstance(key, int):
|
||||
self._list[key] = value[0]
|
||||
else:
|
||||
self._list[key] = value
|
||||
else:
|
||||
self.set(key, value)
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
"""Replace headers in this object with items from another
|
||||
headers object and keyword arguments.
|
||||
|
||||
To extend existing keys instead of replacing, use :meth:`extend`
|
||||
instead.
|
||||
|
||||
If provided, the first argument can be another :class:`Headers`
|
||||
object, a :class:`MultiDict`, :class:`dict`, or iterable of
|
||||
pairs.
|
||||
|
||||
.. versionadded:: 1.0
|
||||
"""
|
||||
if len(args) > 1:
|
||||
raise TypeError(f"update expected at most 1 arguments, got {len(args)}")
|
||||
|
||||
if args:
|
||||
mapping = args[0]
|
||||
|
||||
if isinstance(mapping, (Headers, MultiDict)):
|
||||
for key in mapping.keys():
|
||||
self.setlist(key, mapping.getlist(key))
|
||||
elif isinstance(mapping, dict):
|
||||
for key, value in mapping.items():
|
||||
if isinstance(value, (list, tuple)):
|
||||
self.setlist(key, value)
|
||||
else:
|
||||
self.set(key, value)
|
||||
else:
|
||||
for key, value in mapping:
|
||||
self.set(key, value)
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if isinstance(value, (list, tuple)):
|
||||
self.setlist(key, value)
|
||||
else:
|
||||
self.set(key, value)
|
||||
|
||||
def to_wsgi_list(self):
|
||||
"""Convert the headers into a list suitable for WSGI.
|
||||
|
||||
:return: list
|
||||
"""
|
||||
return list(self)
|
||||
|
||||
def copy(self):
|
||||
return self.__class__(self._list)
|
||||
|
||||
def __copy__(self):
|
||||
return self.copy()
|
||||
|
||||
def __str__(self):
|
||||
"""Returns formatted headers suitable for HTTP transmission."""
|
||||
strs = []
|
||||
for key, value in self.to_wsgi_list():
|
||||
strs.append(f"{key}: {value}")
|
||||
strs.append("\r\n")
|
||||
return "\r\n".join(strs)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{type(self).__name__}({list(self)!r})"
|
||||
|
||||
|
||||
def _options_header_vkw(value: str, kw: dict[str, t.Any]):
|
||||
return http.dump_options_header(
|
||||
value, {k.replace("_", "-"): v for k, v in kw.items()}
|
||||
)
|
||||
|
||||
|
||||
_newline_re = re.compile(r"[\r\n]")
|
||||
|
||||
|
||||
def _str_header_value(value: t.Any) -> str:
|
||||
if not isinstance(value, str):
|
||||
value = str(value)
|
||||
|
||||
if _newline_re.search(value) is not None:
|
||||
raise ValueError("Header values must not contain newline characters.")
|
||||
|
||||
return value
|
||||
|
||||
|
||||
class EnvironHeaders(ImmutableHeadersMixin, Headers):
|
||||
"""Read only version of the headers from a WSGI environment. This
|
||||
provides the same interface as `Headers` and is constructed from
|
||||
a WSGI environment.
|
||||
From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
|
||||
subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
|
||||
render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
|
||||
HTTP exceptions.
|
||||
"""
|
||||
|
||||
def __init__(self, environ):
|
||||
self.environ = environ
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.environ is other.environ
|
||||
|
||||
__hash__ = None
|
||||
|
||||
def __getitem__(self, key, _get_mode=False):
|
||||
# _get_mode is a no-op for this class as there is no index but
|
||||
# used because get() calls it.
|
||||
if not isinstance(key, str):
|
||||
raise KeyError(key)
|
||||
key = key.upper().replace("-", "_")
|
||||
if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:
|
||||
return self.environ[key]
|
||||
return self.environ[f"HTTP_{key}"]
|
||||
|
||||
def __len__(self):
|
||||
# the iter is necessary because otherwise list calls our
|
||||
# len which would call list again and so forth.
|
||||
return len(list(iter(self)))
|
||||
|
||||
def __iter__(self):
|
||||
for key, value in self.environ.items():
|
||||
if key.startswith("HTTP_") and key not in {
|
||||
"HTTP_CONTENT_TYPE",
|
||||
"HTTP_CONTENT_LENGTH",
|
||||
}:
|
||||
yield key[5:].replace("_", "-").title(), value
|
||||
elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:
|
||||
yield key.replace("_", "-").title(), value
|
||||
|
||||
def copy(self):
|
||||
raise TypeError(f"cannot create {type(self).__name__!r} copies")
|
||||
|
||||
|
||||
# circular dependencies
|
||||
from .. import http
|
||||
@@ -0,0 +1,109 @@
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Iterator
|
||||
from collections.abc import Mapping
|
||||
from typing import Literal
|
||||
from typing import NoReturn
|
||||
from typing import overload
|
||||
from typing import TypeVar
|
||||
|
||||
from _typeshed import SupportsKeysAndGetItem
|
||||
from _typeshed.wsgi import WSGIEnvironment
|
||||
|
||||
from .mixins import ImmutableHeadersMixin
|
||||
|
||||
D = TypeVar("D")
|
||||
T = TypeVar("T")
|
||||
|
||||
class Headers(dict[str, str]):
|
||||
_list: list[tuple[str, str]]
|
||||
def __init__(
|
||||
self,
|
||||
defaults: Mapping[str, str | Iterable[str]]
|
||||
| Iterable[tuple[str, str]]
|
||||
| None = None,
|
||||
) -> None: ...
|
||||
@overload
|
||||
def __getitem__(self, key: str) -> str: ...
|
||||
@overload
|
||||
def __getitem__(self, key: int) -> tuple[str, str]: ...
|
||||
@overload
|
||||
def __getitem__(self, key: slice) -> Headers: ...
|
||||
@overload
|
||||
def __getitem__(self, key: str, _get_mode: Literal[True] = ...) -> str: ...
|
||||
def __eq__(self, other: object) -> bool: ...
|
||||
@overload # type: ignore
|
||||
def get(self, key: str, default: str) -> str: ...
|
||||
@overload
|
||||
def get(self, key: str, default: str | None = None) -> str | None: ...
|
||||
@overload
|
||||
def get(
|
||||
self, key: str, default: T | None = None, type: Callable[[str], T] = ...
|
||||
) -> T | None: ...
|
||||
@overload
|
||||
def getlist(self, key: str) -> list[str]: ...
|
||||
@overload
|
||||
def getlist(self, key: str, type: Callable[[str], T]) -> list[T]: ...
|
||||
def get_all(self, name: str) -> list[str]: ...
|
||||
def items( # type: ignore
|
||||
self, lower: bool = False
|
||||
) -> Iterator[tuple[str, str]]: ...
|
||||
def keys(self, lower: bool = False) -> Iterator[str]: ... # type: ignore
|
||||
def values(self) -> Iterator[str]: ... # type: ignore
|
||||
def extend(
|
||||
self,
|
||||
*args: Mapping[str, str | Iterable[str]] | Iterable[tuple[str, str]],
|
||||
**kwargs: str | Iterable[str],
|
||||
) -> None: ...
|
||||
@overload
|
||||
def __delitem__(self, key: str | int | slice) -> None: ...
|
||||
@overload
|
||||
def __delitem__(self, key: str, _index_operation: Literal[False]) -> None: ...
|
||||
def remove(self, key: str) -> None: ...
|
||||
@overload # type: ignore
|
||||
def pop(self, key: str, default: str | None = None) -> str: ...
|
||||
@overload
|
||||
def pop(
|
||||
self, key: int | None = None, default: tuple[str, str] | None = None
|
||||
) -> tuple[str, str]: ...
|
||||
def popitem(self) -> tuple[str, str]: ...
|
||||
def __contains__(self, key: str) -> bool: ... # type: ignore
|
||||
def has_key(self, key: str) -> bool: ...
|
||||
def __iter__(self) -> Iterator[tuple[str, str]]: ... # type: ignore
|
||||
def add(self, _key: str, _value: str, **kw: str) -> None: ...
|
||||
def _validate_value(self, value: str) -> None: ...
|
||||
def add_header(self, _key: str, _value: str, **_kw: str) -> None: ...
|
||||
def clear(self) -> None: ...
|
||||
def set(self, _key: str, _value: str, **kw: str) -> None: ...
|
||||
def setlist(self, key: str, values: Iterable[str]) -> None: ...
|
||||
def setdefault(self, key: str, default: str) -> str: ...
|
||||
def setlistdefault(self, key: str, default: Iterable[str]) -> None: ...
|
||||
@overload
|
||||
def __setitem__(self, key: str, value: str) -> None: ...
|
||||
@overload
|
||||
def __setitem__(self, key: int, value: tuple[str, str]) -> None: ...
|
||||
@overload
|
||||
def __setitem__(self, key: slice, value: Iterable[tuple[str, str]]) -> None: ...
|
||||
@overload
|
||||
def update(
|
||||
self, __m: SupportsKeysAndGetItem[str, str], **kwargs: str | Iterable[str]
|
||||
) -> None: ...
|
||||
@overload
|
||||
def update(
|
||||
self, __m: Iterable[tuple[str, str]], **kwargs: str | Iterable[str]
|
||||
) -> None: ...
|
||||
@overload
|
||||
def update(self, **kwargs: str | Iterable[str]) -> None: ...
|
||||
def to_wsgi_list(self) -> list[tuple[str, str]]: ...
|
||||
def copy(self) -> Headers: ...
|
||||
def __copy__(self) -> Headers: ...
|
||||
|
||||
class EnvironHeaders(ImmutableHeadersMixin, Headers):
|
||||
environ: WSGIEnvironment
|
||||
def __init__(self, environ: WSGIEnvironment) -> None: ...
|
||||
def __eq__(self, other: object) -> bool: ...
|
||||
def __getitem__( # type: ignore
|
||||
self, key: str, _get_mode: Literal[False] = False
|
||||
) -> str: ...
|
||||
def __iter__(self) -> Iterator[tuple[str, str]]: ... # type: ignore
|
||||
def copy(self) -> NoReturn: ...
|
||||
@@ -0,0 +1,242 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from itertools import repeat
|
||||
|
||||
from .._internal import _missing
|
||||
|
||||
|
||||
def is_immutable(self):
|
||||
raise TypeError(f"{type(self).__name__!r} objects are immutable")
|
||||
|
||||
|
||||
class ImmutableListMixin:
|
||||
"""Makes a :class:`list` immutable.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
|
||||
:private:
|
||||
"""
|
||||
|
||||
_hash_cache = None
|
||||
|
||||
def __hash__(self):
|
||||
if self._hash_cache is not None:
|
||||
return self._hash_cache
|
||||
rv = self._hash_cache = hash(tuple(self))
|
||||
return rv
|
||||
|
||||
def __reduce_ex__(self, protocol):
|
||||
return type(self), (list(self),)
|
||||
|
||||
def __delitem__(self, key):
|
||||
is_immutable(self)
|
||||
|
||||
def __iadd__(self, other):
|
||||
is_immutable(self)
|
||||
|
||||
def __imul__(self, other):
|
||||
is_immutable(self)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
is_immutable(self)
|
||||
|
||||
def append(self, item):
|
||||
is_immutable(self)
|
||||
|
||||
def remove(self, item):
|
||||
is_immutable(self)
|
||||
|
||||
def extend(self, iterable):
|
||||
is_immutable(self)
|
||||
|
||||
def insert(self, pos, value):
|
||||
is_immutable(self)
|
||||
|
||||
def pop(self, index=-1):
|
||||
is_immutable(self)
|
||||
|
||||
def reverse(self):
|
||||
is_immutable(self)
|
||||
|
||||
def sort(self, key=None, reverse=False):
|
||||
is_immutable(self)
|
||||
|
||||
|
||||
class ImmutableDictMixin:
|
||||
"""Makes a :class:`dict` immutable.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
|
||||
:private:
|
||||
"""
|
||||
|
||||
_hash_cache = None
|
||||
|
||||
@classmethod
|
||||
def fromkeys(cls, keys, value=None):
|
||||
instance = super().__new__(cls)
|
||||
instance.__init__(zip(keys, repeat(value)))
|
||||
return instance
|
||||
|
||||
def __reduce_ex__(self, protocol):
|
||||
return type(self), (dict(self),)
|
||||
|
||||
def _iter_hashitems(self):
|
||||
return self.items()
|
||||
|
||||
def __hash__(self):
|
||||
if self._hash_cache is not None:
|
||||
return self._hash_cache
|
||||
rv = self._hash_cache = hash(frozenset(self._iter_hashitems()))
|
||||
return rv
|
||||
|
||||
def setdefault(self, key, default=None):
|
||||
is_immutable(self)
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
is_immutable(self)
|
||||
|
||||
def pop(self, key, default=None):
|
||||
is_immutable(self)
|
||||
|
||||
def popitem(self):
|
||||
is_immutable(self)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
is_immutable(self)
|
||||
|
||||
def __delitem__(self, key):
|
||||
is_immutable(self)
|
||||
|
||||
def clear(self):
|
||||
is_immutable(self)
|
||||
|
||||
|
||||
class ImmutableMultiDictMixin(ImmutableDictMixin):
|
||||
"""Makes a :class:`MultiDict` immutable.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
|
||||
:private:
|
||||
"""
|
||||
|
||||
def __reduce_ex__(self, protocol):
|
||||
return type(self), (list(self.items(multi=True)),)
|
||||
|
||||
def _iter_hashitems(self):
|
||||
return self.items(multi=True)
|
||||
|
||||
def add(self, key, value):
|
||||
is_immutable(self)
|
||||
|
||||
def popitemlist(self):
|
||||
is_immutable(self)
|
||||
|
||||
def poplist(self, key):
|
||||
is_immutable(self)
|
||||
|
||||
def setlist(self, key, new_list):
|
||||
is_immutable(self)
|
||||
|
||||
def setlistdefault(self, key, default_list=None):
|
||||
is_immutable(self)
|
||||
|
||||
|
||||
class ImmutableHeadersMixin:
|
||||
"""Makes a :class:`Headers` immutable. We do not mark them as
|
||||
hashable though since the only usecase for this datastructure
|
||||
in Werkzeug is a view on a mutable structure.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
|
||||
:private:
|
||||
"""
|
||||
|
||||
def __delitem__(self, key, **kwargs):
|
||||
is_immutable(self)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
is_immutable(self)
|
||||
|
||||
def set(self, _key, _value, **kwargs):
|
||||
is_immutable(self)
|
||||
|
||||
def setlist(self, key, values):
|
||||
is_immutable(self)
|
||||
|
||||
def add(self, _key, _value, **kwargs):
|
||||
is_immutable(self)
|
||||
|
||||
def add_header(self, _key, _value, **_kwargs):
|
||||
is_immutable(self)
|
||||
|
||||
def remove(self, key):
|
||||
is_immutable(self)
|
||||
|
||||
def extend(self, *args, **kwargs):
|
||||
is_immutable(self)
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
is_immutable(self)
|
||||
|
||||
def insert(self, pos, value):
|
||||
is_immutable(self)
|
||||
|
||||
def pop(self, key=None, default=_missing):
|
||||
is_immutable(self)
|
||||
|
||||
def popitem(self):
|
||||
is_immutable(self)
|
||||
|
||||
def setdefault(self, key, default):
|
||||
is_immutable(self)
|
||||
|
||||
def setlistdefault(self, key, default):
|
||||
is_immutable(self)
|
||||
|
||||
|
||||
def _calls_update(name):
|
||||
def oncall(self, *args, **kw):
|
||||
rv = getattr(super(UpdateDictMixin, self), name)(*args, **kw)
|
||||
|
||||
if self.on_update is not None:
|
||||
self.on_update(self)
|
||||
|
||||
return rv
|
||||
|
||||
oncall.__name__ = name
|
||||
return oncall
|
||||
|
||||
|
||||
class UpdateDictMixin(dict):
|
||||
"""Makes dicts call `self.on_update` on modifications.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
|
||||
:private:
|
||||
"""
|
||||
|
||||
on_update = None
|
||||
|
||||
def setdefault(self, key, default=None):
|
||||
modified = key not in self
|
||||
rv = super().setdefault(key, default)
|
||||
if modified and self.on_update is not None:
|
||||
self.on_update(self)
|
||||
return rv
|
||||
|
||||
def pop(self, key, default=_missing):
|
||||
modified = key in self
|
||||
if default is _missing:
|
||||
rv = super().pop(key)
|
||||
else:
|
||||
rv = super().pop(key, default)
|
||||
if modified and self.on_update is not None:
|
||||
self.on_update(self)
|
||||
return rv
|
||||
|
||||
__setitem__ = _calls_update("__setitem__")
|
||||
__delitem__ = _calls_update("__delitem__")
|
||||
clear = _calls_update("clear")
|
||||
popitem = _calls_update("popitem")
|
||||
update = _calls_update("update")
|
||||
@@ -0,0 +1,97 @@
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Hashable
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
from typing import NoReturn
|
||||
from typing import overload
|
||||
from typing import SupportsIndex
|
||||
from typing import TypeVar
|
||||
|
||||
from _typeshed import SupportsKeysAndGetItem
|
||||
|
||||
from .headers import Headers
|
||||
|
||||
K = TypeVar("K")
|
||||
T = TypeVar("T")
|
||||
V = TypeVar("V")
|
||||
|
||||
def is_immutable(self: object) -> NoReturn: ...
|
||||
|
||||
class ImmutableListMixin(list[V]):
|
||||
_hash_cache: int | None
|
||||
def __hash__(self) -> int: ... # type: ignore
|
||||
def __delitem__(self, key: SupportsIndex | slice) -> NoReturn: ...
|
||||
def __iadd__(self, other: t.Any) -> NoReturn: ... # type: ignore
|
||||
def __imul__(self, other: SupportsIndex) -> NoReturn: ...
|
||||
def __setitem__(self, key: int | slice, value: V) -> NoReturn: ... # type: ignore
|
||||
def append(self, value: V) -> NoReturn: ...
|
||||
def remove(self, value: V) -> NoReturn: ...
|
||||
def extend(self, values: Iterable[V]) -> NoReturn: ...
|
||||
def insert(self, pos: SupportsIndex, value: V) -> NoReturn: ...
|
||||
def pop(self, index: SupportsIndex = -1) -> NoReturn: ...
|
||||
def reverse(self) -> NoReturn: ...
|
||||
def sort(
|
||||
self, key: Callable[[V], Any] | None = None, reverse: bool = False
|
||||
) -> NoReturn: ...
|
||||
|
||||
class ImmutableDictMixin(dict[K, V]):
|
||||
_hash_cache: int | None
|
||||
@classmethod
|
||||
def fromkeys( # type: ignore
|
||||
cls, keys: Iterable[K], value: V | None = None
|
||||
) -> ImmutableDictMixin[K, V]: ...
|
||||
def _iter_hashitems(self) -> Iterable[Hashable]: ...
|
||||
def __hash__(self) -> int: ... # type: ignore
|
||||
def setdefault(self, key: K, default: V | None = None) -> NoReturn: ...
|
||||
def update(self, *args: Any, **kwargs: V) -> NoReturn: ...
|
||||
def pop(self, key: K, default: V | None = None) -> NoReturn: ... # type: ignore
|
||||
def popitem(self) -> NoReturn: ...
|
||||
def __setitem__(self, key: K, value: V) -> NoReturn: ...
|
||||
def __delitem__(self, key: K) -> NoReturn: ...
|
||||
def clear(self) -> NoReturn: ...
|
||||
|
||||
class ImmutableMultiDictMixin(ImmutableDictMixin[K, V]):
|
||||
def _iter_hashitems(self) -> Iterable[Hashable]: ...
|
||||
def add(self, key: K, value: V) -> NoReturn: ...
|
||||
def popitemlist(self) -> NoReturn: ...
|
||||
def poplist(self, key: K) -> NoReturn: ...
|
||||
def setlist(self, key: K, new_list: Iterable[V]) -> NoReturn: ...
|
||||
def setlistdefault(
|
||||
self, key: K, default_list: Iterable[V] | None = None
|
||||
) -> NoReturn: ...
|
||||
|
||||
class ImmutableHeadersMixin(Headers):
|
||||
def __delitem__(self, key: Any, _index_operation: bool = True) -> NoReturn: ...
|
||||
def __setitem__(self, key: Any, value: Any) -> NoReturn: ...
|
||||
def set(self, _key: Any, _value: Any, **kw: Any) -> NoReturn: ...
|
||||
def setlist(self, key: Any, values: Any) -> NoReturn: ...
|
||||
def add(self, _key: Any, _value: Any, **kw: Any) -> NoReturn: ...
|
||||
def add_header(self, _key: Any, _value: Any, **_kw: Any) -> NoReturn: ...
|
||||
def remove(self, key: Any) -> NoReturn: ...
|
||||
def extend(self, *args: Any, **kwargs: Any) -> NoReturn: ...
|
||||
def update(self, *args: Any, **kwargs: Any) -> NoReturn: ...
|
||||
def insert(self, pos: Any, value: Any) -> NoReturn: ...
|
||||
def pop(self, key: Any = None, default: Any = ...) -> NoReturn: ...
|
||||
def popitem(self) -> NoReturn: ...
|
||||
def setdefault(self, key: Any, default: Any) -> NoReturn: ...
|
||||
def setlistdefault(self, key: Any, default: Any) -> NoReturn: ...
|
||||
|
||||
def _calls_update(name: str) -> Callable[[UpdateDictMixin[K, V]], Any]: ...
|
||||
|
||||
class UpdateDictMixin(dict[K, V]):
|
||||
on_update: Callable[[UpdateDictMixin[K, V] | None, None], None]
|
||||
def setdefault(self, key: K, default: V | None = None) -> V: ...
|
||||
@overload
|
||||
def pop(self, key: K) -> V: ...
|
||||
@overload
|
||||
def pop(self, key: K, default: V | T = ...) -> V | T: ...
|
||||
def __setitem__(self, key: K, value: V) -> None: ...
|
||||
def __delitem__(self, key: K) -> None: ...
|
||||
def clear(self) -> None: ...
|
||||
def popitem(self) -> tuple[K, V]: ...
|
||||
@overload
|
||||
def update(self, __m: SupportsKeysAndGetItem[K, V], **kwargs: V) -> None: ...
|
||||
@overload
|
||||
def update(self, __m: Iterable[tuple[K, V]], **kwargs: V) -> None: ...
|
||||
@overload
|
||||
def update(self, **kwargs: V) -> None: ...
|
||||
@@ -0,0 +1,180 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
class IfRange:
|
||||
"""Very simple object that represents the `If-Range` header in parsed
|
||||
form. It will either have neither a etag or date or one of either but
|
||||
never both.
|
||||
|
||||
.. versionadded:: 0.7
|
||||
"""
|
||||
|
||||
def __init__(self, etag=None, date=None):
|
||||
#: The etag parsed and unquoted. Ranges always operate on strong
|
||||
#: etags so the weakness information is not necessary.
|
||||
self.etag = etag
|
||||
#: The date in parsed format or `None`.
|
||||
self.date = date
|
||||
|
||||
def to_header(self):
|
||||
"""Converts the object back into an HTTP header."""
|
||||
if self.date is not None:
|
||||
return http.http_date(self.date)
|
||||
if self.etag is not None:
|
||||
return http.quote_etag(self.etag)
|
||||
return ""
|
||||
|
||||
def __str__(self):
|
||||
return self.to_header()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{type(self).__name__} {str(self)!r}>"
|
||||
|
||||
|
||||
class Range:
|
||||
"""Represents a ``Range`` header. All methods only support only
|
||||
bytes as the unit. Stores a list of ranges if given, but the methods
|
||||
only work if only one range is provided.
|
||||
|
||||
:raise ValueError: If the ranges provided are invalid.
|
||||
|
||||
.. versionchanged:: 0.15
|
||||
The ranges passed in are validated.
|
||||
|
||||
.. versionadded:: 0.7
|
||||
"""
|
||||
|
||||
def __init__(self, units, ranges):
|
||||
#: The units of this range. Usually "bytes".
|
||||
self.units = units
|
||||
#: A list of ``(begin, end)`` tuples for the range header provided.
|
||||
#: The ranges are non-inclusive.
|
||||
self.ranges = ranges
|
||||
|
||||
for start, end in ranges:
|
||||
if start is None or (end is not None and (start < 0 or start >= end)):
|
||||
raise ValueError(f"{(start, end)} is not a valid range.")
|
||||
|
||||
def range_for_length(self, length):
|
||||
"""If the range is for bytes, the length is not None and there is
|
||||
exactly one range and it is satisfiable it returns a ``(start, stop)``
|
||||
tuple, otherwise `None`.
|
||||
"""
|
||||
if self.units != "bytes" or length is None or len(self.ranges) != 1:
|
||||
return None
|
||||
start, end = self.ranges[0]
|
||||
if end is None:
|
||||
end = length
|
||||
if start < 0:
|
||||
start += length
|
||||
if http.is_byte_range_valid(start, end, length):
|
||||
return start, min(end, length)
|
||||
return None
|
||||
|
||||
def make_content_range(self, length):
|
||||
"""Creates a :class:`~werkzeug.datastructures.ContentRange` object
|
||||
from the current range and given content length.
|
||||
"""
|
||||
rng = self.range_for_length(length)
|
||||
if rng is not None:
|
||||
return ContentRange(self.units, rng[0], rng[1], length)
|
||||
return None
|
||||
|
||||
def to_header(self):
|
||||
"""Converts the object back into an HTTP header."""
|
||||
ranges = []
|
||||
for begin, end in self.ranges:
|
||||
if end is None:
|
||||
ranges.append(f"{begin}-" if begin >= 0 else str(begin))
|
||||
else:
|
||||
ranges.append(f"{begin}-{end - 1}")
|
||||
return f"{self.units}={','.join(ranges)}"
|
||||
|
||||
def to_content_range_header(self, length):
|
||||
"""Converts the object into `Content-Range` HTTP header,
|
||||
based on given length
|
||||
"""
|
||||
range = self.range_for_length(length)
|
||||
if range is not None:
|
||||
return f"{self.units} {range[0]}-{range[1] - 1}/{length}"
|
||||
return None
|
||||
|
||||
def __str__(self):
|
||||
return self.to_header()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{type(self).__name__} {str(self)!r}>"
|
||||
|
||||
|
||||
def _callback_property(name):
|
||||
def fget(self):
|
||||
return getattr(self, name)
|
||||
|
||||
def fset(self, value):
|
||||
setattr(self, name, value)
|
||||
if self.on_update is not None:
|
||||
self.on_update(self)
|
||||
|
||||
return property(fget, fset)
|
||||
|
||||
|
||||
class ContentRange:
|
||||
"""Represents the content range header.
|
||||
|
||||
.. versionadded:: 0.7
|
||||
"""
|
||||
|
||||
def __init__(self, units, start, stop, length=None, on_update=None):
|
||||
assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
|
||||
self.on_update = on_update
|
||||
self.set(start, stop, length, units)
|
||||
|
||||
#: The units to use, usually "bytes"
|
||||
units = _callback_property("_units")
|
||||
#: The start point of the range or `None`.
|
||||
start = _callback_property("_start")
|
||||
#: The stop point of the range (non-inclusive) or `None`. Can only be
|
||||
#: `None` if also start is `None`.
|
||||
stop = _callback_property("_stop")
|
||||
#: The length of the range or `None`.
|
||||
length = _callback_property("_length")
|
||||
|
||||
def set(self, start, stop, length=None, units="bytes"):
|
||||
"""Simple method to update the ranges."""
|
||||
assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
|
||||
self._units = units
|
||||
self._start = start
|
||||
self._stop = stop
|
||||
self._length = length
|
||||
if self.on_update is not None:
|
||||
self.on_update(self)
|
||||
|
||||
def unset(self):
|
||||
"""Sets the units to `None` which indicates that the header should
|
||||
no longer be used.
|
||||
"""
|
||||
self.set(None, None, units=None)
|
||||
|
||||
def to_header(self):
|
||||
if self.units is None:
|
||||
return ""
|
||||
if self.length is None:
|
||||
length = "*"
|
||||
else:
|
||||
length = self.length
|
||||
if self.start is None:
|
||||
return f"{self.units} */{length}"
|
||||
return f"{self.units} {self.start}-{self.stop - 1}/{length}"
|
||||
|
||||
def __bool__(self):
|
||||
return self.units is not None
|
||||
|
||||
def __str__(self):
|
||||
return self.to_header()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{type(self).__name__} {str(self)!r}>"
|
||||
|
||||
|
||||
# circular dependencies
|
||||
from .. import http
|
||||
@@ -0,0 +1,57 @@
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
|
||||
class IfRange:
|
||||
etag: str | None
|
||||
date: datetime | None
|
||||
def __init__(
|
||||
self, etag: str | None = None, date: datetime | None = None
|
||||
) -> None: ...
|
||||
def to_header(self) -> str: ...
|
||||
|
||||
class Range:
|
||||
units: str
|
||||
ranges: list[tuple[int, int | None]]
|
||||
def __init__(self, units: str, ranges: list[tuple[int, int | None]]) -> None: ...
|
||||
def range_for_length(self, length: int | None) -> tuple[int, int] | None: ...
|
||||
def make_content_range(self, length: int | None) -> ContentRange | None: ...
|
||||
def to_header(self) -> str: ...
|
||||
def to_content_range_header(self, length: int | None) -> str | None: ...
|
||||
|
||||
def _callback_property(name: str) -> property: ...
|
||||
|
||||
class ContentRange:
|
||||
on_update: Callable[[ContentRange], None] | None
|
||||
def __init__(
|
||||
self,
|
||||
units: str | None,
|
||||
start: int | None,
|
||||
stop: int | None,
|
||||
length: int | None = None,
|
||||
on_update: Callable[[ContentRange], None] | None = None,
|
||||
) -> None: ...
|
||||
@property
|
||||
def units(self) -> str | None: ...
|
||||
@units.setter
|
||||
def units(self, value: str | None) -> None: ...
|
||||
@property
|
||||
def start(self) -> int | None: ...
|
||||
@start.setter
|
||||
def start(self, value: int | None) -> None: ...
|
||||
@property
|
||||
def stop(self) -> int | None: ...
|
||||
@stop.setter
|
||||
def stop(self, value: int | None) -> None: ...
|
||||
@property
|
||||
def length(self) -> int | None: ...
|
||||
@length.setter
|
||||
def length(self, value: int | None) -> None: ...
|
||||
def set(
|
||||
self,
|
||||
start: int | None,
|
||||
stop: int | None,
|
||||
length: int | None = None,
|
||||
units: str | None = "bytes",
|
||||
) -> None: ...
|
||||
def unset(self) -> None: ...
|
||||
def to_header(self) -> str: ...
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,208 @@
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Iterator
|
||||
from collections.abc import Mapping
|
||||
from typing import Any
|
||||
from typing import Generic
|
||||
from typing import Literal
|
||||
from typing import NoReturn
|
||||
from typing import overload
|
||||
from typing import TypeVar
|
||||
|
||||
from .mixins import (
|
||||
ImmutableDictMixin,
|
||||
ImmutableListMixin,
|
||||
ImmutableMultiDictMixin,
|
||||
UpdateDictMixin,
|
||||
)
|
||||
|
||||
D = TypeVar("D")
|
||||
K = TypeVar("K")
|
||||
T = TypeVar("T")
|
||||
V = TypeVar("V")
|
||||
_CD = TypeVar("_CD", bound="CallbackDict")
|
||||
|
||||
def is_immutable(self: object) -> NoReturn: ...
|
||||
def iter_multi_items(
|
||||
mapping: Mapping[K, V | Iterable[V]] | Iterable[tuple[K, V]]
|
||||
) -> Iterator[tuple[K, V]]: ...
|
||||
|
||||
class ImmutableList(ImmutableListMixin[V]): ...
|
||||
|
||||
class TypeConversionDict(dict[K, V]):
|
||||
@overload
|
||||
def get(self, key: K, default: None = ..., type: None = ...) -> V | None: ...
|
||||
@overload
|
||||
def get(self, key: K, default: D, type: None = ...) -> D | V: ...
|
||||
@overload
|
||||
def get(self, key: K, default: D, type: Callable[[V], T]) -> D | T: ...
|
||||
@overload
|
||||
def get(self, key: K, type: Callable[[V], T]) -> T | None: ...
|
||||
|
||||
class ImmutableTypeConversionDict(ImmutableDictMixin[K, V], TypeConversionDict[K, V]):
|
||||
def copy(self) -> TypeConversionDict[K, V]: ...
|
||||
def __copy__(self) -> ImmutableTypeConversionDict: ...
|
||||
|
||||
class MultiDict(TypeConversionDict[K, V]):
|
||||
def __init__(
|
||||
self,
|
||||
mapping: Mapping[K, Iterable[V] | V] | Iterable[tuple[K, V]] | None = None,
|
||||
) -> None: ...
|
||||
def __getitem__(self, item: K) -> V: ...
|
||||
def __setitem__(self, key: K, value: V) -> None: ...
|
||||
def add(self, key: K, value: V) -> None: ...
|
||||
@overload
|
||||
def getlist(self, key: K) -> list[V]: ...
|
||||
@overload
|
||||
def getlist(self, key: K, type: Callable[[V], T] = ...) -> list[T]: ...
|
||||
def setlist(self, key: K, new_list: Iterable[V]) -> None: ...
|
||||
def setdefault(self, key: K, default: V | None = None) -> V: ...
|
||||
def setlistdefault(
|
||||
self, key: K, default_list: Iterable[V] | None = None
|
||||
) -> list[V]: ...
|
||||
def items(self, multi: bool = False) -> Iterator[tuple[K, V]]: ... # type: ignore
|
||||
def lists(self) -> Iterator[tuple[K, list[V]]]: ...
|
||||
def values(self) -> Iterator[V]: ... # type: ignore
|
||||
def listvalues(self) -> Iterator[list[V]]: ...
|
||||
def copy(self) -> MultiDict[K, V]: ...
|
||||
def deepcopy(self, memo: Any = None) -> MultiDict[K, V]: ...
|
||||
@overload
|
||||
def to_dict(self) -> dict[K, V]: ...
|
||||
@overload
|
||||
def to_dict(self, flat: Literal[False]) -> dict[K, list[V]]: ...
|
||||
def update( # type: ignore
|
||||
self, mapping: Mapping[K, Iterable[V] | V] | Iterable[tuple[K, V]]
|
||||
) -> None: ...
|
||||
@overload
|
||||
def pop(self, key: K) -> V: ...
|
||||
@overload
|
||||
def pop(self, key: K, default: V | T = ...) -> V | T: ...
|
||||
def popitem(self) -> tuple[K, V]: ...
|
||||
def poplist(self, key: K) -> list[V]: ...
|
||||
def popitemlist(self) -> tuple[K, list[V]]: ...
|
||||
def __copy__(self) -> MultiDict[K, V]: ...
|
||||
def __deepcopy__(self, memo: Any) -> MultiDict[K, V]: ...
|
||||
|
||||
class _omd_bucket(Generic[K, V]):
|
||||
prev: _omd_bucket | None
|
||||
next: _omd_bucket | None
|
||||
key: K
|
||||
value: V
|
||||
def __init__(self, omd: OrderedMultiDict, key: K, value: V) -> None: ...
|
||||
def unlink(self, omd: OrderedMultiDict) -> None: ...
|
||||
|
||||
class OrderedMultiDict(MultiDict[K, V]):
|
||||
_first_bucket: _omd_bucket | None
|
||||
_last_bucket: _omd_bucket | None
|
||||
def __init__(self, mapping: Mapping[K, V] | None = None) -> None: ...
|
||||
def __eq__(self, other: object) -> bool: ...
|
||||
def __getitem__(self, key: K) -> V: ...
|
||||
def __setitem__(self, key: K, value: V) -> None: ...
|
||||
def __delitem__(self, key: K) -> None: ...
|
||||
def keys(self) -> Iterator[K]: ... # type: ignore
|
||||
def __iter__(self) -> Iterator[K]: ...
|
||||
def values(self) -> Iterator[V]: ... # type: ignore
|
||||
def items(self, multi: bool = False) -> Iterator[tuple[K, V]]: ... # type: ignore
|
||||
def lists(self) -> Iterator[tuple[K, list[V]]]: ...
|
||||
def listvalues(self) -> Iterator[list[V]]: ...
|
||||
def add(self, key: K, value: V) -> None: ...
|
||||
@overload
|
||||
def getlist(self, key: K) -> list[V]: ...
|
||||
@overload
|
||||
def getlist(self, key: K, type: Callable[[V], T] = ...) -> list[T]: ...
|
||||
def setlist(self, key: K, new_list: Iterable[V]) -> None: ...
|
||||
def setlistdefault(
|
||||
self, key: K, default_list: Iterable[V] | None = None
|
||||
) -> list[V]: ...
|
||||
def update( # type: ignore
|
||||
self, mapping: Mapping[K, V] | Iterable[tuple[K, V]]
|
||||
) -> None: ...
|
||||
def poplist(self, key: K) -> list[V]: ...
|
||||
@overload
|
||||
def pop(self, key: K) -> V: ...
|
||||
@overload
|
||||
def pop(self, key: K, default: V | T = ...) -> V | T: ...
|
||||
def popitem(self) -> tuple[K, V]: ...
|
||||
def popitemlist(self) -> tuple[K, list[V]]: ...
|
||||
|
||||
class CombinedMultiDict(ImmutableMultiDictMixin[K, V], MultiDict[K, V]): # type: ignore
|
||||
dicts: list[MultiDict[K, V]]
|
||||
def __init__(self, dicts: Iterable[MultiDict[K, V]] | None) -> None: ...
|
||||
@classmethod
|
||||
def fromkeys(cls, keys: Any, value: Any = None) -> NoReturn: ...
|
||||
def __getitem__(self, key: K) -> V: ...
|
||||
@overload # type: ignore
|
||||
def get(self, key: K) -> V | None: ...
|
||||
@overload
|
||||
def get(self, key: K, default: V | T = ...) -> V | T: ...
|
||||
@overload
|
||||
def get(
|
||||
self, key: K, default: T | None = None, type: Callable[[V], T] = ...
|
||||
) -> T | None: ...
|
||||
@overload
|
||||
def getlist(self, key: K) -> list[V]: ...
|
||||
@overload
|
||||
def getlist(self, key: K, type: Callable[[V], T] = ...) -> list[T]: ...
|
||||
def _keys_impl(self) -> set[K]: ...
|
||||
def keys(self) -> set[K]: ... # type: ignore
|
||||
def __iter__(self) -> set[K]: ... # type: ignore
|
||||
def items(self, multi: bool = False) -> Iterator[tuple[K, V]]: ... # type: ignore
|
||||
def values(self) -> Iterator[V]: ... # type: ignore
|
||||
def lists(self) -> Iterator[tuple[K, list[V]]]: ...
|
||||
def listvalues(self) -> Iterator[list[V]]: ...
|
||||
def copy(self) -> MultiDict[K, V]: ...
|
||||
@overload
|
||||
def to_dict(self) -> dict[K, V]: ...
|
||||
@overload
|
||||
def to_dict(self, flat: Literal[False]) -> dict[K, list[V]]: ...
|
||||
def __contains__(self, key: K) -> bool: ... # type: ignore
|
||||
def has_key(self, key: K) -> bool: ...
|
||||
|
||||
class ImmutableDict(ImmutableDictMixin[K, V], dict[K, V]):
|
||||
def copy(self) -> dict[K, V]: ...
|
||||
def __copy__(self) -> ImmutableDict[K, V]: ...
|
||||
|
||||
class ImmutableMultiDict( # type: ignore
|
||||
ImmutableMultiDictMixin[K, V], MultiDict[K, V]
|
||||
):
|
||||
def copy(self) -> MultiDict[K, V]: ...
|
||||
def __copy__(self) -> ImmutableMultiDict[K, V]: ...
|
||||
|
||||
class ImmutableOrderedMultiDict( # type: ignore
|
||||
ImmutableMultiDictMixin[K, V], OrderedMultiDict[K, V]
|
||||
):
|
||||
def _iter_hashitems(self) -> Iterator[tuple[int, tuple[K, V]]]: ...
|
||||
def copy(self) -> OrderedMultiDict[K, V]: ...
|
||||
def __copy__(self) -> ImmutableOrderedMultiDict[K, V]: ...
|
||||
|
||||
class CallbackDict(UpdateDictMixin[K, V], dict[K, V]):
|
||||
def __init__(
|
||||
self,
|
||||
initial: Mapping[K, V] | Iterable[tuple[K, V]] | None = None,
|
||||
on_update: Callable[[_CD], None] | None = None,
|
||||
) -> None: ...
|
||||
|
||||
class HeaderSet(set[str]):
|
||||
_headers: list[str]
|
||||
_set: set[str]
|
||||
on_update: Callable[[HeaderSet], None] | None
|
||||
def __init__(
|
||||
self,
|
||||
headers: Iterable[str] | None = None,
|
||||
on_update: Callable[[HeaderSet], None] | None = None,
|
||||
) -> None: ...
|
||||
def add(self, header: str) -> None: ...
|
||||
def remove(self, header: str) -> None: ...
|
||||
def update(self, iterable: Iterable[str]) -> None: ... # type: ignore
|
||||
def discard(self, header: str) -> None: ...
|
||||
def find(self, header: str) -> int: ...
|
||||
def index(self, header: str) -> int: ...
|
||||
def clear(self) -> None: ...
|
||||
def as_set(self, preserve_casing: bool = False) -> set[str]: ...
|
||||
def to_header(self) -> str: ...
|
||||
def __getitem__(self, idx: int) -> str: ...
|
||||
def __delitem__(self, idx: int) -> None: ...
|
||||
def __setitem__(self, idx: int, value: str) -> None: ...
|
||||
def __contains__(self, header: str) -> bool: ... # type: ignore
|
||||
def __len__(self) -> int: ...
|
||||
def __iter__(self) -> Iterator[str]: ...
|
||||
Reference in New Issue
Block a user