Source code for limits.util

""" """

import dataclasses
import re
import sys
from collections import UserDict
from types import ModuleType
from typing import TYPE_CHECKING, cast

import importlib_resources
from packaging.version import Version

from limits.typing import Dict, List, NamedTuple, Optional, Tuple, Type, Union

from .errors import ConfigurationError
from .limits import GRANULARITIES, RateLimitItem

SEPARATORS = re.compile(r"[,;|]{1}")
SINGLE_EXPR = re.compile(
    r"""
    \s*([0-9]+)
    \s*(/|\s*per\s*)
    \s*([0-9]+)
    *\s*(hour|minute|second|day|month|year)s?\s*""",
    re.IGNORECASE | re.VERBOSE,
)
EXPR = re.compile(
    r"^{SINGLE}(:?{SEPARATORS}{SINGLE})*$".format(
        SINGLE=SINGLE_EXPR.pattern, SEPARATORS=SEPARATORS.pattern
    ),
    re.IGNORECASE | re.VERBOSE,
)


[docs] class WindowStats(NamedTuple): """ Tuple to describe a rate limited window """ #: Time as seconds since the Epoch when this window will be reset reset_time: int #: Quantity remaining in this window remaining: int
@dataclasses.dataclass class Dependency: name: str version_required: Optional[Version] version_found: Optional[Version] module: ModuleType if TYPE_CHECKING: _UserDict = UserDict[str, Dependency] else: _UserDict = UserDict class DependencyDict(_UserDict): Missing = Dependency("Missing", None, None, ModuleType("Missing")) def __getitem__(self, key: str) -> Dependency: dependency = super().__getitem__(key) if dependency == DependencyDict.Missing: raise ConfigurationError(f"{key} prerequisite not available") elif dependency.version_required and ( not dependency.version_found or dependency.version_found < dependency.version_required ): raise ConfigurationError( f"The minimum version of {dependency.version_required}" f" of {dependency.name} could not be found" ) return dependency class LazyDependency: """ Simple utility that provides an :attr:`dependency` to the child class to fetch any dependencies without having to import them explicitly. """ DEPENDENCIES: Union[Dict[str, Optional[Version]], List[str]] = [] """ The python modules this class has a dependency on. Used to lazily populate the :attr:`dependencies` """ def __init__(self) -> None: self._dependencies: DependencyDict = DependencyDict() @property def dependencies(self) -> DependencyDict: """ Cached mapping of the modules this storage depends on. This is done so that the module is only imported lazily when the storage is instantiated. :meta private: """ if not getattr(self, "_dependencies", None): dependencies = DependencyDict() mapping: Dict[str, Optional[Version]] if isinstance(self.DEPENDENCIES, list): mapping = {dependency: None for dependency in self.DEPENDENCIES} else: mapping = self.DEPENDENCIES for name, minimum_version in mapping.items(): dependency, version = get_dependency(name) if not dependency: dependencies[name] = DependencyDict.Missing else: dependencies[name] = Dependency( name, minimum_version, version, dependency ) self._dependencies = dependencies return self._dependencies def get_dependency(module_path: str) -> Tuple[Optional[ModuleType], Optional[Version]]: """ safe function to import a module at runtime """ try: if module_path not in sys.modules: __import__(module_path) root = module_path.split(".")[0] version = getattr(sys.modules[root], "__version__", "0.0.0") return sys.modules[module_path], Version(version) except ImportError: # pragma: no cover return None, None def get_package_data(path: str) -> bytes: return cast(bytes, importlib_resources.files("limits").joinpath(path).read_bytes())
[docs] def parse_many(limit_string: str) -> List[RateLimitItem]: """ parses rate limits in string notation containing multiple rate limits (e.g. ``1/second; 5/minute``) :param limit_string: rate limit string using :ref:`ratelimit-string` :raise ValueError: if the string notation is invalid. """ if not (isinstance(limit_string, str) and EXPR.match(limit_string)): raise ValueError("couldn't parse rate limit string '%s'" % limit_string) limits = [] for limit in SEPARATORS.split(limit_string): match = SINGLE_EXPR.match(limit) if match: amount, _, multiples, granularity_string = match.groups() granularity = granularity_from_string(granularity_string) limits.append( granularity(int(amount), multiples and int(multiples) or None) ) return limits
[docs] def parse(limit_string: str) -> RateLimitItem: """ parses a single rate limit in string notation (e.g. ``1/second`` or ``1 per second``) :param limit_string: rate limit string using :ref:`ratelimit-string` :raise ValueError: if the string notation is invalid. """ return list(parse_many(limit_string))[0]
def granularity_from_string(granularity_string: str) -> Type[RateLimitItem]: """ :param granularity_string: :raise ValueError: """ for granularity in GRANULARITIES.values(): if granularity.check_granularity_string(granularity_string): return granularity raise ValueError("no granularity matched for %s" % granularity_string)