Source code for anycache

"""Cache any python object to file."""

import collections
import hashlib
import logging
import pathlib
import shutil
import sys
import tempfile

import dill as pickle  # improved pickle
import filelock

__all__ = ("AnyCache", "anycache", "get_defaultcache")

_CacheEntry = collections.namedtuple("_CacheEntry", ("ident", "data", "dep", "lock"))
_CacheEntryInfo = collections.namedtuple("_CacheEntryInfo", ("ce", "mtime", "size"))
_FuncInfo = collections.namedtuple("FuncInfo", ("func", "args", "kwargs", "depfilefunc"))

if sys.version_info[0] < 3:  # pragma: no cover
    _bytes = bytes
    # pylint: disable=redefined-builtin
    FileExistsError = OSError
else:  # pragma: no cover

    def _bytes(name):
        return bytes(name, encoding="utf-8")


_CACHE_SUFFIX = ".cache"
_DEP_SUFFIX = ".dep"
_LOCK_SUFFIX = ".lock"


class _CacheInfo:

    """Cache Information Contianer."""

    def __init__(self, cachedir):
        datafilepaths = cachedir.glob("*%s" % _CACHE_SUFFIX)
        self.cacheentries = [_CacheInfo.create_ce_from_datafilepath(d) for d in datafilepaths]
        self.cacheentryinfos = [_CacheInfo.create_cei(ce) for ce in self.cacheentries]
        self.totalsize = sum(cei.size for cei in self.cacheentryinfos)

    @staticmethod
    def create_ce_from_ident(cachedir, ident):
        """Create Cache Entry from Identifier."""
        data = cachedir / (ident + _CACHE_SUFFIX)
        dep = cachedir / (ident + _DEP_SUFFIX)
        lock = filelock.FileLock(str(cachedir / (ident + _LOCK_SUFFIX)))
        return _CacheEntry(ident, data, dep, lock)

    @staticmethod
    def create_ce_from_datafilepath(datafilepath):
        """Create Cache Entry from filepath."""
        ident = datafilepath.name
        data = datafilepath
        dep = datafilepath.with_suffix(_DEP_SUFFIX)
        lock = filelock.FileLock(str(datafilepath.with_suffix(_LOCK_SUFFIX)))
        return _CacheEntry(ident, data, dep, lock)

    @staticmethod
    def create_cei(ce):
        """Create Cache Entry Info."""
        mtime = ce.data.stat().st_mtime
        size = ce.data.stat().st_size + ce.dep.stat().st_size
        return _CacheEntryInfo(ce, mtime, size)


[docs]class AnyCache: """ Cache for python objects. Keyword Args: cachedir: Directory for cached python objects. :any:`AnyCache` instances on the same `cachedir` share the same cache. maxsize: Maximum cache size in bytes. `None` does not limit the cache size. `0` disables caching. It the maximum size is smaller than the last cached object, this object is kept. During object write the cache size might be larger than `maxsize`. At maximum twice as large as the maximum object size. The :any:`AnyCache` instance mainly serves the :any:`AnyCache.anycache` method for caching the result of functions. >>> from anycache import AnyCache >>> ac = AnyCache() >>> @ac.anycache() ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> myfunc(4, 5) Calcing 4 + 5 = 9 9 >>> myfunc(4, 5) 9 >>> myfunc(4, 2) Calcing 4 + 2 = 6 6 The cache size is returned by :any:`AnyCache.size`. >>> ac.size 10 The cache size can be limited via `maxsize`. A `maxsize` of `0` disables caching. >>> ac.maxsize = 0 >>> myfunc(4, 5) Calcing 4 + 5 = 9 9 The cache is preserved in this case, and needs to be cleared explicitly: >>> ac.size 10 >>> ac.clear() >>> ac.size 0 """ def __init__(self, cachedir=None, maxsize=None): self.cachedir = cachedir self.maxsize = maxsize @property def cachedir(self): """ Cache directory use for all cache files. :any:`AnyCache` instances on the same `cachedir` share the same cache. """ if self.__cachedir is None: self.__cachedir = pathlib.Path(tempfile.mkdtemp(suffix=".anycache")) return self.__cachedir @cachedir.setter def cachedir(self, cachedir): if cachedir is not None: self.__cachedir = pathlib.Path(cachedir) self.__explicit_cachedir = True else: self.__cachedir = None self.__explicit_cachedir = False def __del__(self): if not self.__explicit_cachedir: self.clear()
[docs] def anycache(self, depfilefunc=None): """ Decorator to cache result of function depending on arguments. Keyword Args: depfilefunc: Dependency file function (see example below) >>> from anycache import AnyCache >>> ac = AnyCache() >>> @ac.anycache() ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> myfunc(2, 5) Calcing 2 + 5 = 7 7 >>> myfunc(2, 5) 7 File I/O is not tracked by the decorator. Instead a function needs to be implemented, which returns the paths of the files, which influence the function result. The `depfilefunc` is called with the function result and all arguments. The following example, depends on the path of the source code itself: >>> def mydepfilefunc(result, posarg, kwarg=3): ... print(" Deps of %r + %r = %r" % (posarg, kwarg, result)) ... return [__file__] >>> @ac.anycache(depfilefunc=mydepfilefunc) ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> myfunc(2, 7) Calcing 2 + 7 = 9 Deps of 2 + 7 = 9 9 """ def decorator(func): def is_outdated(*args, **kwargs): funcinfo = _FuncInfo(func, args, kwargs, depfilefunc) return self._is_outdated(funcinfo) def remove(*args, **kwargs): funcinfo = _FuncInfo(func, args, kwargs, depfilefunc) return self._remove(funcinfo) def get_ident(*args, **kwargs): funcinfo = _FuncInfo(func, args, kwargs, depfilefunc) return self._get_ident(funcinfo) def wrapped(*args, **kwargs): if self.maxsize == 0: result = func(*args, **kwargs) else: funcinfo = _FuncInfo(func, args, kwargs, depfilefunc) result = self._anycache(funcinfo) return result wrapped.is_outdated = is_outdated wrapped.remove = remove wrapped.get_ident = get_ident return wrapped return decorator
[docs] def is_outdated(self, func, *args, **kwargs): """ Return `True` if cache is outdated for `func` used with `args` and `kwargs`. >>> from anycache import AnyCache >>> ac = AnyCache() >>> @ac.anycache() ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> ac.is_outdated(myfunc, 2, 5) True >>> myfunc(2, 5) Calcing 2 + 5 = 7 7 >>> ac.is_outdated(myfunc, 2, 5) False """ return func.is_outdated(*args, **kwargs)
[docs] def remove(self, func, *args, **kwargs): """ Remove cache data for `func` used with `args` and `kwargs`. >>> from anycache import AnyCache >>> ac = AnyCache() >>> @ac.anycache() ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> myfunc(2, 5) Calcing 2 + 5 = 7 7 >>> ac.remove(myfunc, 2, 5) >>> myfunc(2, 5) Calcing 2 + 5 = 7 7 Removing non-existing cache entries is not an error: >>> ac.remove(myfunc, 2, 5) >>> ac.remove(myfunc, 2, 5) """ return func.remove(*args, **kwargs)
[docs] def get_ident(self, func, *args, **kwargs): """ Return identification string for `func` used with `args` and `kwargs`. >>> from anycache import AnyCache >>> ac = AnyCache() >>> @ac.anycache() ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> @ac.anycache() ... def otherfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> ac.get_ident(myfunc, 2, 5) '19044d3869955fa79d7f3db8fcdc5af84b3f55c0bdab2b4aee1bb21e1a9856c9' >>> ac.get_ident(myfunc, 2, 6) '1885e09f9898a1f1bd186f052d0e810693faa14b70e7b6b22de61b90c8171427' >>> ac.get_ident(otherfunc, 2, 5) '9b8aea26422999aaa7aed0fdb4d5145fd33b87de20f322cf175997e9b1835158' """ return func.get_ident(*args, **kwargs)
[docs] def clear(self): """Clear the cache by removing all cache files.""" # destructor save implementation try: cachedir = self.__cachedir except AttributeError: # pragma: no cover cachedir = None if cachedir and cachedir.exists(): logging.getLogger(__name__).debug("CLEARING cache '%s", cachedir) for file in cachedir.glob("*"): file.unlink() cachedir.rmdir()
@property def size(self): """Return total size of all cache files.""" if self.cachedir.exists(): size = sum(file.stat().st_size for file in self.cachedir.glob("*")) else: size = 0 return size def _anycache(self, funcinfo): logger = logging.getLogger(__name__) ident = self._get_ident(funcinfo) ce = _CacheInfo.create_ce_from_ident(self.cachedir, ident) self._ensure_cachedir() # try to read valid, result = AnyCache.__read(logger, ce) if not valid: func, args, kwargs, depfilefunc = funcinfo # execute result = func(*args, **kwargs) # deps deps = tuple(depfilefunc(result, *args, **kwargs)) if depfilefunc else [] # write AnyCache.__write(logger, ce, result, deps) # remove old AnyCache.__tidyup(logger, self.cachedir, self.maxsize) return result def _is_outdated(self, funcinfo): logger = logging.getLogger(__name__) ident = self._get_ident(funcinfo) ce = _CacheInfo.create_ce_from_ident(self.cachedir, ident) self._ensure_cachedir() with ce.lock: is_outdated = AnyCache.__is_outdated(logger, ce) return is_outdated def _remove(self, funcinfo): logger = logging.getLogger(__name__) ident = self._get_ident(funcinfo) ce = _CacheInfo.create_ce_from_ident(self.cachedir, ident) self._ensure_cachedir() AnyCache.__remove(logger, ce) @staticmethod def _get_ident(fi): func = fi.func name = "%s.%s(%s, %s)" % (func.__module__, func.__name__, fi.args, fi.kwargs) h = hashlib.sha256() h.update(_bytes(name)) ident = h.hexdigest() return ident def _ensure_cachedir(self): try: self.cachedir.mkdir(parents=True) except FileExistsError: pass @staticmethod def __is_outdated(logger, ce): outdated = True if ce.dep.exists() and ce.data.exists(): data_mtime = ce.data.stat().st_mtime # pylint: disable=broad-exception-caught try: with open(str(ce.dep), "r", encoding="utf-8") as depfile: outdated = any((pathlib.Path(line.rstrip()).stat().st_mtime > data_mtime) for line in depfile) except Exception: logger.warning("CORRUPT cache dep '%s'", ce.dep) return outdated @staticmethod def __read(logger, ce): valid, result = False, None with ce.lock: if not AnyCache.__is_outdated(logger, ce): with open(str(ce.data), "rb") as cachefile: # pylint: disable=broad-exception-caught try: result, valid = pickle.load(cachefile), True logger.info("READING cache entry '%s'", ce.ident) except Exception as exc: logger.warning("CORRUPT cache entry '%s'. %r", ce.data, exc) ce.data.touch() return valid, result @staticmethod def __write(logger, ce, result, deps): logger.info("WRITING cache entry '%s'", ce.ident) # we need to lock the cache for write # writing takes a long time, so we are writing to temporay files, lock and copy over. # pylint: disable=broad-exception-caught try: with tempfile.NamedTemporaryFile("wb", prefix="anycache-", suffix=_CACHE_SUFFIX) as datatmpfile: with tempfile.NamedTemporaryFile("w", prefix="anycache-", suffix=_DEP_SUFFIX) as deptmpfile: # data pickle.dump(result, datatmpfile) datatmpfile.flush() # dep for dep in deps: deptmpfile.write("%s\n" % (dep)) deptmpfile.flush() # copy over with ce.lock: shutil.copyfile(datatmpfile.name, str(ce.data)) shutil.copyfile(deptmpfile.name, str(ce.dep)) except Exception as exc: # pragma: no cover logger.warning("FAILED cache write '%s'. %r", ce.data, exc) @staticmethod def __tidyup(logger, cachedir, maxsize): if maxsize is not None: cacheinfo = _CacheInfo(cachedir) totalsize = cacheinfo.totalsize ceis = collections.deque(sorted(cacheinfo.cacheentryinfos, key=lambda info: info.mtime)) while (totalsize > maxsize) and (len(ceis) > 2): cei = ceis.popleft() totalsize -= cei.size AnyCache.__remove(logger, cei.ce) @staticmethod def __remove(logger, ce): with ce.lock: if ce.data.exists(): ce.data.unlink() if ce.dep.exists(): ce.dep.unlink() logger.info("REMOVING cache entry '%s'", ce.ident)
__DEFAULT_CACHE = None
[docs]def anycache(cachedir=None, maxsize=None, depfilefunc=None): """ Decorator to cache result of function depending on arguments. This decorator uses one unlimited global cache within one python run. Different anycached functions have different cache name spaces and do not influence each other. To preserve the cache result between multiple python runs, use an :any:`AnyCache` instance with a persistent `cachedir`. Keyword Args: cachedir: Directory for cached python objects. :any:`AnyCache` instances on the same `cachedir` share the same cache. maxsize: Maximum cache size in bytes. `None` does not limit the cache size. `0` disables caching. It the maximum size is smaller than the last cached object, this object is kept. During object write the cache size might be larger than `maxsize`. At maximum twice as large as the maximum object size. depfilefunc: Dependency file function (see example below) >>> from anycache import anycache >>> @anycache() ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> myfunc(2, 5) Calcing 2 + 5 = 7 7 >>> myfunc(2, 5) 7 File I/O is not tracked by the decorator. Instead a function needs to be implemented, which returns the paths of the files, which influence the function result. The `depfilefunc` is called with the function result and all arguments. The following example, depends on the path of the source code itself: >>> def mydepfilefunc(result, posarg, kwarg=3): ... print(" Deps of %r + %r = %r" % (posarg, kwarg, result)) ... return [__file__] >>> @anycache(depfilefunc=mydepfilefunc) ... def myfunc(posarg, kwarg=3): ... print(" Calcing %r + %r = %r" % (posarg, kwarg, posarg + kwarg)) ... return posarg + kwarg >>> myfunc(2, 7) Calcing 2 + 7 = 9 Deps of 2 + 7 = 9 9 """ if (cachedir is not None) or (maxsize is not None): ac = AnyCache(cachedir=cachedir, maxsize=maxsize) else: ac = get_defaultcache() return ac.anycache(depfilefunc=depfilefunc)
[docs]def get_defaultcache(): """Return unlimited default :any:`AnyCache` instance.""" # pylint: disable=global-statement global __DEFAULT_CACHE if __DEFAULT_CACHE is None: __DEFAULT_CACHE = AnyCache() return __DEFAULT_CACHE