Source code for petastorm.local_disk_cache
# Copyright (c) 2017-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import shutil
from diskcache import FanoutCache
from petastorm.cache import CacheBase
[docs]class LocalDiskCache(CacheBase):
def __init__(self, path, size_limit_bytes, expected_row_size_bytes, shards=6, cleanup=False, **settings):
"""LocalDiskCache is an adapter to a diskcache implementation.
LocalDiskCache can be used by a petastorm Reader class to temporarily keep parts of the dataset on a local
file system storage.
:param path: Path where the dataset cache is being stored.
:param size_limit_bytes: Maximal size of the disk-space to be used by cache. The size of the cache may actually
grow somewhat above the size_limit_bytes, so the limit is not very strict.
:param expected_row_size_bytes: Approximate size of a single row. This argument is used to perform a sanity
check on the capacity of individual shards.
:param shards: Cache can be sharded. Larger number of shards improve writing parallelism.
:param cleanup: If set to True, cache directory would be removed when cleanup() method is called.
:param settings: these parameters passed-through to the diskcache.Cache class.
For details, see: http://www.grantjenks.com/docs/diskcache/tutorial.html#settings
"""
if size_limit_bytes / shards < 5 * expected_row_size_bytes:
raise ValueError('Condition \'size_limit_bytes / shards < 5 * expected_row_size_bytes\' needs to hold, '
'otherwise, newly added cached values might end up being immediately evicted.')
default_settings = {
'size_limit': size_limit_bytes,
'eviction_policy': 'least-recently-stored',
}
default_settings.update(settings)
self._cleanup = cleanup
self._path = path
self._cache = FanoutCache(path, shards, **default_settings)
[docs] def get(self, key, fill_cache_func):
value = self._cache.get(key, default=None)
if value is None:
value = fill_cache_func()
self._cache.set(key, value)
return value
[docs] def cleanup(self):
if self._cleanup:
shutil.rmtree(self._path)