var/opt/nydus/ops/primordial/cacheutils.py000064400000002572147205263650014761 0ustar00# -*- coding: utf-8 -*- from functools import lru_cache, wraps from datetime import datetime, timedelta DEFAULT_MAXSIZE = 128 def memoize(obj): """A general-use memoizer decorator for class, object, function; supports kwargs""" cache = obj.cache = {} @wraps(obj) def memoizer(*args, **kwargs): """Actual implementation""" key = str(args) + str(kwargs) if key not in cache: cache[key] = obj(*args, **kwargs) return cache[key] return memoizer def timed_lru_cache(seconds: int, maxsize: int = DEFAULT_MAXSIZE): """A decorator that wraps lru_cache and allows the setting of a lifetime in seconds, after which the cache will expire :param seconds: The number of seconds after which the cache will expire :param maxsize: The maximum number of entries in the cache before it will start dropping old entries """ def wrapper_cache(func): func = lru_cache(maxsize=maxsize)(func) func.lifetime = timedelta(seconds=seconds) func.expiration = datetime.utcnow() + func.lifetime @wraps(func) def wrapped_func(*args, **kwargs): if datetime.utcnow() >= func.expiration: func.cache_clear() func.expiration = datetime.utcnow() + func.lifetime return func(*args, **kwargs) return wrapped_func return wrapper_cache