Source code for limits.strategies

"""
rate limiting strategies
"""

from abc import ABCMeta, abstractmethod
import weakref
import six


[docs]@six.add_metaclass(ABCMeta) class RateLimiter(object): def __init__(self, storage): self.storage = weakref.ref(storage)
[docs] @abstractmethod def hit(self, item, *identifiers): """ creates a hit on the rate limit and returns True if successful. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: True/False """ raise NotImplementedError
[docs] @abstractmethod def test(self, item, *identifiers): """ checks the rate limit and returns True if it is not currently exceeded. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: True/False """ raise NotImplementedError
[docs] @abstractmethod def get_window_stats(self, item, *identifiers): """ returns the number of requests remaining and reset of this limit. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: tuple (reset time (int), remaining (int)) """ raise NotImplementedError
def clear(self, item, *identifiers): return self.storage().clear(item.key_for(*identifiers))
[docs]class MovingWindowRateLimiter(RateLimiter): """ Reference: :ref:`moving-window` """ def __init__(self, storage): if not ( hasattr(storage, "acquire_entry") or hasattr(storage, "get_moving_window") ): raise NotImplementedError( "MovingWindowRateLimiting is not implemented for storage " "of type %s" % storage.__class__ ) super(MovingWindowRateLimiter, self).__init__(storage)
[docs] def hit(self, item, *identifiers): """ creates a hit on the rate limit and returns True if successful. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: True/False """ return self.storage().acquire_entry( item.key_for(*identifiers), item.amount, item.get_expiry() )
[docs] def test(self, item, *identifiers): """ checks the rate limit and returns True if it is not currently exceeded. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: True/False """ return ( self.storage().get_moving_window( item.key_for(*identifiers), item.amount, item.get_expiry(), )[1] < item.amount )
[docs] def get_window_stats(self, item, *identifiers): """ returns the number of requests remaining within this limit. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: tuple (reset time (int), remaining (int)) """ window_start, window_items = self.storage().get_moving_window( item.key_for(*identifiers), item.amount, item.get_expiry() ) reset = window_start + item.get_expiry() return (reset, item.amount - window_items)
[docs]class FixedWindowRateLimiter(RateLimiter): """ Reference: :ref:`fixed-window` """
[docs] def hit(self, item, *identifiers): """ creates a hit on the rate limit and returns True if successful. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: True/False """ return ( self.storage().incr(item.key_for(*identifiers), item.get_expiry()) <= item.amount )
[docs] def test(self, item, *identifiers): """ checks the rate limit and returns True if it is not currently exceeded. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: True/False """ return self.storage().get(item.key_for(*identifiers)) < item.amount
[docs] def get_window_stats(self, item, *identifiers): """ returns the number of requests remaining and reset of this limit. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: tuple (reset time (int), remaining (int)) """ remaining = max(0, item.amount - self.storage().get(item.key_for(*identifiers))) reset = self.storage().get_expiry(item.key_for(*identifiers)) return (reset, remaining)
[docs]class FixedWindowElasticExpiryRateLimiter(FixedWindowRateLimiter): """ Reference: :ref:`fixed-window-elastic` """
[docs] def hit(self, item, *identifiers): """ creates a hit on the rate limit and returns True if successful. :param item: a :class:`RateLimitItem` instance :param identifiers: variable list of strings to uniquely identify the limit :return: True/False """ return ( self.storage().incr(item.key_for(*identifiers), item.get_expiry(), True) <= item.amount )
STRATEGIES = { "fixed-window": FixedWindowRateLimiter, "fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter, "moving-window": MovingWindowRateLimiter, }