Source code for limits.storage.memcached

import inspect
import threading
import time

from six.moves import urllib

from ..errors import ConfigurationError
from ..util import get_dependency
from .base import Storage


[docs]class MemcachedStorage(Storage): """ Rate limit storage with memcached as backend. Depends on the `pymemcache` library. """ MAX_CAS_RETRIES = 10 STORAGE_SCHEME = ["memcached"] def __init__(self, uri, **options): """ :param str uri: memcached location of the form `memcached://host:port,host:port`, `memcached:///var/tmp/path/to/sock` :param options: all remaining keyword arguments are passed directly to the constructor of :class:`pymemcache.client.base.Client` :raise ConfigurationError: when `pymemcache` is not available """ parsed = urllib.parse.urlparse(uri) self.hosts = [] for loc in parsed.netloc.strip().split(","): if not loc: continue host, port = loc.split(":") self.hosts.append((host, int(port))) else: # filesystem path to UDS if parsed.path and not parsed.netloc and not parsed.port: self.hosts = [parsed.path] self.library = options.pop("library", "pymemcache.client") self.cluster_library = options.pop("cluster_library", "pymemcache.client.hash") self.client_getter = options.pop("client_getter", self.get_client) self.options = options if not get_dependency(self.library): raise ConfigurationError( "memcached prerequisite not available." " please install %s" % self.library ) # pragma: no cover self.local_storage = threading.local() self.local_storage.storage = None
[docs] def get_client(self, module, hosts, **kwargs): """ returns a memcached client. :param module: the memcached module :param hosts: list of memcached hosts :return: """ return ( module.HashClient(hosts, **kwargs) if len(hosts) > 1 else module.Client(*hosts, **kwargs) )
def call_memcached_func(self, func, *args, **kwargs): if "noreply" in kwargs: argspec = inspect.getargspec(func) if not ("noreply" in argspec.args or argspec.keywords): kwargs.pop("noreply") # noqa return func(*args, **kwargs) @property def storage(self): """ lazily creates a memcached client instance using a thread local """ if not (hasattr(self.local_storage, "storage") and self.local_storage.storage): self.local_storage.storage = self.client_getter( get_dependency( self.cluster_library if len(self.hosts) > 1 else self.library ), self.hosts, **self.options ) return self.local_storage.storage
[docs] def get(self, key): """ :param str key: the key to get the counter value for """ return int(self.storage.get(key) or 0)
[docs] def clear(self, key): """ :param str key: the key to clear rate limits for """ self.storage.delete(key)
[docs] def incr(self, key, expiry, elastic_expiry=False): """ increments the counter for a given rate limit key :param str key: the key to increment :param int expiry: amount in seconds for the key to expire in :param bool elastic_expiry: whether to keep extending the rate limit window every hit. """ if not self.call_memcached_func( self.storage.add, key, 1, expiry, noreply=False ): if elastic_expiry: value, cas = self.storage.gets(key) retry = 0 while ( not self.call_memcached_func( self.storage.cas, key, int(value or 0) + 1, cas, expiry ) and retry < self.MAX_CAS_RETRIES ): value, cas = self.storage.gets(key) retry += 1 self.call_memcached_func( self.storage.set, key + "/expires", expiry + time.time(), expire=expiry, noreply=False, ) return int(value or 0) + 1 else: return self.storage.incr(key, 1) self.call_memcached_func( self.storage.set, key + "/expires", expiry + time.time(), expire=expiry, noreply=False, ) return 1
[docs] def get_expiry(self, key): """ :param str key: the key to get the expiry for """ return int(float(self.storage.get(key + "/expires") or time.time()))
[docs] def check(self): """ check if storage is healthy """ try: self.call_memcached_func(self.storage.get, "limiter-check") return True except: # noqa return False