import urllib
import warnings
from typing import cast
from deprecated.sphinx import versionchanged
from packaging.version import Version
from limits.errors import ConfigurationError
from limits.storage.redis import RedisStorage
from limits.typing import Dict, List, Optional, Tuple, Union
[docs]
@versionchanged(
version="2.5.0",
reason="""
Cluster support was provided by the :pypi:`redis-py-cluster` library
which has been absorbed into the official :pypi:`redis` client. By
default the :class:`redis.cluster.RedisCluster` client will be used
however if the version of the package is lower than ``4.2.0`` the implementation
will fallback to trying to use :class:`rediscluster.RedisCluster`.
""",
)
class RedisClusterStorage(RedisStorage):
"""
Rate limit storage with redis cluster as backend
Depends on :pypi:`redis`.
"""
STORAGE_SCHEME = ["redis+cluster"]
"""The storage scheme for redis cluster"""
DEFAULT_OPTIONS: Dict[str, Union[float, str, bool]] = {
"max_connections": 1000,
}
"Default options passed to the :class:`~redis.cluster.RedisCluster`"
DEPENDENCIES = {
"redis": Version("4.2.0"),
"rediscluster": Version("2.0.0"), # Deprecated since 2.6.0
}
def __init__(self, uri: str, **options: Union[float, str, bool]) -> None:
"""
:param uri: url of the form
``redis+cluster://[:password]@host:port,host:port``
:param options: all remaining keyword arguments are passed
directly to the constructor of :class:`redis.cluster.RedisCluster`
:raise ConfigurationError: when the :pypi:`redis` library is not
available or if the redis cluster cannot be reached.
"""
parsed = urllib.parse.urlparse(uri)
parsed_auth: Dict[str, Union[float, str, bool]] = {}
if parsed.username:
parsed_auth["username"] = parsed.username
if parsed.password:
parsed_auth["password"] = parsed.password
sep = parsed.netloc.find("@") + 1
cluster_hosts = []
for loc in parsed.netloc[sep:].split(","):
host, port = loc.split(":")
cluster_hosts.append((host, int(port)))
self.storage = None
self.using_redis_py = False
self.__pick_storage(
cluster_hosts, **{**self.DEFAULT_OPTIONS, **parsed_auth, **options}
)
assert self.storage
self.initialize_storage(uri)
super(RedisStorage, self).__init__(uri, **options)
def __pick_storage(
self, cluster_hosts: List[Tuple[str, int]], **options: Union[float, str, bool]
) -> None:
try:
redis_py = self.dependencies["redis"].module
startup_nodes = [redis_py.cluster.ClusterNode(*c) for c in cluster_hosts]
self.storage = redis_py.cluster.RedisCluster(
startup_nodes=startup_nodes, **options
)
self.using_redis_py = True
return
except ConfigurationError: # pragma: no cover
self.__use_legacy_cluster_implementation(cluster_hosts, **options)
if not self.storage:
raise ConfigurationError(
(
"Unable to find an implementation for redis cluster"
" Cluster support requires either redis-py>=4.2 or"
" redis-py-cluster"
)
)
def __use_legacy_cluster_implementation(
self, cluster_hosts: List[Tuple[str, int]], **options: Union[float, str, bool]
) -> None: # pragma: no cover
redis_cluster = self.dependencies["rediscluster"].module
warnings.warn(
(
"Using redis-py-cluster is deprecated as the library has been"
" absorbed by redis-py (>=4.2). The support will be eventually "
" removed from the limits library and is no longer tested "
" against since version: 2.6. To get rid of this warning, "
" uninstall redis-py-cluster and ensure redis-py>=4.2.0 is installed"
)
)
self.storage = redis_cluster.RedisCluster(
startup_nodes=[{"host": c[0], "port": c[1]} for c in cluster_hosts],
**options,
)
[docs]
def reset(self) -> Optional[int]:
"""
Redis Clusters are sharded and deleting across shards
can't be done atomically. Because of this, this reset loops over all
keys that are prefixed with `self.PREFIX` and calls delete on them,
one at a time.
.. warning::
This operation was not tested with extremely large data sets.
On a large production based system, care should be taken with its
usage as it could be slow on very large data sets"""
prefix = self.prefixed_key("*")
if self.using_redis_py:
count = 0
for primary in self.storage.get_primaries():
node = self.storage.get_redis_connection(primary)
keys = node.keys(prefix)
count += sum([node.delete(k.decode("utf-8")) for k in keys])
return count
else: # pragma: no cover
keys = self.storage.keys(prefix)
return cast(
int, sum([self.storage.delete(k.decode("utf-8")) for k in keys])
)