[docs]@versionchanged(version="3.14.0",reason="""Dropped support for the :pypi:`redis-py-cluster` librarywhich has been abandoned/deprecated.""",)@versionchanged(version="2.5.0",reason="""Cluster support was provided by the :pypi:`redis-py-cluster` librarywhich has been absorbed into the official :pypi:`redis` client. Bydefault the :class:`redis.cluster.RedisCluster` client will be usedhowever if the version of the package is lower than ``4.2.0`` the implementationwill fallback to trying to use :class:`rediscluster.RedisCluster`.""",)@versionchanged(version="4.3",reason=("Added support for using the redis client from :pypi:`valkey`"" if :paramref:`uri` has the ``valkey+cluster://`` schema"),)classRedisClusterStorage(RedisStorage):""" Rate limit storage with redis cluster as backend Depends on :pypi:`redis` (or :pypi:`valkey` if :paramref:`uri` starts with ``valkey+cluster://``). """STORAGE_SCHEME=["redis+cluster","valkey+cluster"]"""The storage scheme for redis cluster"""DEFAULT_OPTIONS:dict[str,float|str|bool]={"max_connections":1000,}"Default options passed to the :class:`~redis.cluster.RedisCluster`"DEPENDENCIES={"redis":Version("4.2.0"),"valkey":Version("6.0"),}def__init__(self,uri:str,key_prefix:str=RedisStorage.PREFIX,wrap_exceptions:bool=False,**options:float|str|bool,)->None:""" :param uri: url of the form ``redis+cluster://[:password]@host:port,host:port`` If the uri scheme is ``valkey+cluster`` the implementation used will be from :pypi:`valkey`. :param key_prefix: the prefix for each key created in redis :param wrap_exceptions: Whether to wrap storage exceptions in :exc:`limits.errors.StorageError` before raising it. :param options: all remaining keyword arguments are passed directly to the constructor of :class:`redis.cluster.RedisCluster` :raise ConfigurationError: when the :pypi:`redis` library is not available or if the redis cluster cannot be reached. """storage_uri_options=parse_storage_uri(uri)parsed_auth={}ifusername:=options.get("username",storage_uri_options.username):parsed_auth["username"]=usernameifpassword:=options.get("password",storage_uri_options.password):parsed_auth["password"]=passwordcluster_hosts=storage_uri_options.locationsself.key_prefix=key_prefixself.storage=Noneself.target_server="valkey"ifuri.startswith("valkey")else"redis"self.dependency=self.dependencies[self.target_server].modulestartup_nodes=[self.dependency.cluster.ClusterNode(*c)forcincluster_hosts]merged_options={**self.DEFAULT_OPTIONS,**{"startup_nodes":startup_nodes},**parsed_auth,**options,}ifself.target_server=="redis":self.storage=self.dependency.cluster.RedisCluster(**merged_options)else:self.storage=self.dependency.cluster.ValkeyCluster(**merged_options)assertself.storageself.initialize_storage(uri)super(RedisStorage,self).__init__(uri,wrap_exceptions,**options)
[docs]defreset(self)->int|None:""" Redis Clusters are sharded and deleting across shards can't be done atomically. Because of this, this reset loops over all keys that are prefixed with :paramref:`RedisClusterStorage.prefix` and calls delete on them one at a time. .. warning:: This operation was not tested with extremely large data sets. On a large production based system, care should be taken with its usage as it could be slow on very large data sets"""prefix=self.prefixed_key("*")count=0forprimaryinself.storage.get_primaries():node=self.storage.get_redis_connection(primary)keys=node.keys(prefix)count+=sum([node.delete(k.decode("utf-8"))forkinkeys])returncount