61 static const unsigned long long BYTES_PER_MEG = 1048576ULL;
65 static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
81 d_cache_dir(cache_dir), d_prefix(prefix), d_max_cache_size_in_bytes(size)
83 m_initialize_cache_info();
88 d_cache_dir = cache_dir;
90 d_max_cache_size_in_bytes = size;
92 m_initialize_cache_info();
95 static inline string get_errno() {
96 char *s_err = strerror(errno);
100 return "Unknown error.";
104 static inline struct flock *lock(
int type) {
105 static struct flock lock;
107 lock.l_whence = SEEK_SET;
110 lock.l_pid = getpid();
115 inline void BESFileLockingCache::m_record_descriptor(
const string &file,
int fd) {
116 BESDEBUG(
"cache",
"DAP Cache: recording descriptor: " << file <<
", " << fd << endl);
117 d_locks.insert(std::pair<string, int>(file, fd));
120 inline int BESFileLockingCache::m_get_descriptor(
const string &file) {
121 BESDEBUG(
"cache",
"BESFileLockingCache::m_get_descriptor; d_locks size: " << d_locks.size() << endl);
122 FilesAndLockDescriptors::iterator i = d_locks.find(file);
123 if (i == d_locks.end())
127 BESDEBUG(
"cache",
"DAP Cache: getting descriptor: " << file <<
", " << fd << endl);
137 static void unlock(
int fd)
139 if (fcntl(fd, F_SETLK, lock(F_UNLCK)) == -1) {
140 throw BESInternalError(
"An error occurred trying to unlock the file: " + get_errno(), __FILE__, __LINE__);
144 throw BESInternalError(
"Could not close the (just) unlocked file.", __FILE__, __LINE__);
159 static bool getSharedLock(
const string &file_name,
int &ref_fd)
161 BESDEBUG(
"cache",
"getSharedLock: " << file_name <<endl);
164 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
174 struct flock *l = lock(F_RDLCK);
175 if (fcntl(fd, F_SETLKW, l) == -1) {
178 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
182 BESDEBUG(
"cache",
"getSharedLock exit: " << file_name <<endl);
203 BESDEBUG(
"cache",
"BESFileLockingCache::getExclusiveLock() - " << file_name <<endl);
206 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
216 struct flock *l = lock(F_WRLCK);
217 if (fcntl(fd, F_SETLKW, l) == -1) {
220 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
224 BESDEBUG(
"cache",
"BESFileLockingCache::getExclusiveLock() - exit: " << file_name <<endl);
242 static bool getExclusiveLockNB(
string file_name,
int &ref_fd)
244 BESDEBUG(
"cache",
"getExclusiveLock_nonblocking: " << file_name <<endl);
247 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
257 struct flock *l = lock(F_WRLCK);
258 if (fcntl(fd, F_SETLK, l) == -1) {
261 BESDEBUG(
"cache",
"getExclusiveLock_nonblocking exit (false): " << file_name <<
" by: " << l->l_pid << endl);
268 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
274 BESDEBUG(
"cache",
"getExclusiveLock_nonblocking exit (true): " << file_name <<endl);
294 static bool createLockedFile(
string file_name,
int &ref_fd)
296 BESDEBUG(
"cache",
"createLockedFile: " << file_name <<endl);
299 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
309 struct flock *l = lock(F_WRLCK);
310 if (fcntl(fd, F_SETLKW, l) == -1) {
313 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
317 BESDEBUG(
"cache",
"createLockedFile exit: " << file_name <<endl);
325 void BESFileLockingCache::m_check_ctor_params()
327 if (d_cache_dir.empty()) {
328 string err =
"The cache directory was not specified";
333 int statret = stat(d_cache_dir.c_str(), &buf);
334 if (statret != 0 || !S_ISDIR(buf.st_mode)) {
336 int status = mkdir(d_cache_dir.c_str(), 0775);
338 string err =
"The cache directory " + d_cache_dir +
" does not exist or could not be created.";
343 if (d_prefix.empty()) {
344 string err =
"The cache file prefix was not specified, must not be empty";
348 if (d_max_cache_size_in_bytes <= 0) {
349 string err =
"The cache size was not specified, must be greater than zero";
353 BESDEBUG(
"cache",
"DAP Cache: directory " << d_cache_dir <<
", prefix " << d_prefix
354 <<
", max size " << d_max_cache_size_in_bytes << endl );
358 void BESFileLockingCache::m_initialize_cache_info()
362 d_max_cache_size_in_bytes =
min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
363 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
364 d_target_size = d_max_cache_size_in_bytes * 0.8;
366 m_check_ctor_params();
368 d_cache_info = d_cache_dir +
"/dap.cache.info";
372 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
374 unsigned long long size = 0;
375 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
376 throw BESInternalError(
"Could not write size info to the cache info file `"+d_cache_info+
"`", __FILE__, __LINE__);
382 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
389 BESDEBUG(
"cache",
"d_cache_info_fd: " << d_cache_info_fd << endl);
412 if (target.at(0) ==
'/') {
413 target = src.substr(1, target.length() - 1);
415 string::size_type slash = 0;
416 while ((slash = target.find(
'/')) != string::npos) {
417 target.replace(slash, 1, 1, BESFileLockingCache::DAP_CACHE_CHAR);
419 string::size_type last_dot = target.rfind(
'.');
420 if (last_dot != string::npos) {
421 target = target.substr(0, last_dot);
424 BESDEBUG(
"cache",
" d_cache_dir: '" << d_cache_dir <<
"'" << endl);
425 BESDEBUG(
"cache",
" d_prefix: '" << d_prefix <<
"'" << endl);
426 BESDEBUG(
"cache",
" target: '" << target <<
"'" << endl);
428 return d_cache_dir +
"/" + d_prefix + BESFileLockingCache::DAP_CACHE_CHAR + target;
452 bool status = getSharedLock(target, fd);
454 BESDEBUG(
"cache",
"BESFileLockingCache: get_read_lock: " << target <<
" (status: " << status <<
", fd: " << fd <<
")" << endl);
457 m_record_descriptor(target, fd);
480 bool status = createLockedFile(target, fd);
482 BESDEBUG(
"cache",
"BESFileLockingCache: create_and_lock: " << target <<
" (status: " << status <<
", fd: " << fd <<
")" << endl);
485 m_record_descriptor(target, fd);
509 lock.l_type = F_RDLCK;
510 lock.l_whence = SEEK_SET;
513 lock.l_pid = getpid();
515 if (fcntl(fd, F_SETLKW, &lock) == -1) {
530 BESDEBUG(
"cache",
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
532 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_WRLCK)) == -1) {
533 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__, __LINE__);
542 BESDEBUG(
"cache",
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
544 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_RDLCK)) == -1) {
545 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__, __LINE__);
556 BESDEBUG(
"cache",
"DAP Cache: unlock: cache_info (fd: " << d_cache_info_fd <<
")" << endl);
558 if (fcntl(d_cache_info_fd, F_SETLK, lock(F_UNLCK)) == -1) {
559 throw BESInternalError(
"An error occurred trying to unlock the cache-control file" + get_errno(), __FILE__, __LINE__);
580 BESDEBUG(
"cache",
"DAP Cache: unlock file: " << file_name << endl);
582 int fd = m_get_descriptor(file_name);
585 fd = m_get_descriptor(file_name);
603 BESDEBUG(
"cache",
"DAP Cache: unlock fd: " << fd << endl);
607 BESDEBUG(
"cache",
"DAP Cache: unlock " << fd <<
" Success" << endl);
623 unsigned long long current_size;
627 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
628 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
631 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
632 throw BESInternalError(
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
635 int statret = stat(target.c_str(), &buf);
637 current_size += buf.st_size;
639 throw BESInternalError(
"Could not read the size of the new file: " + target +
" : " + get_errno(), __FILE__, __LINE__);
641 BESDEBUG(
"cache",
"DAP Cache: cache size updated to: " << current_size << endl);
643 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
644 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
646 if(write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
647 throw BESInternalError(
"Could not write size info from the cache info file!", __FILE__, __LINE__);
665 return current_size > d_max_cache_size_in_bytes;
677 unsigned long long current_size;
681 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
682 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
684 if(read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
685 throw BESInternalError(
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
704 unsigned long long BESFileLockingCache::m_collect_cache_dir_info(
CacheFiles &contents)
706 DIR *dip = opendir(d_cache_dir.c_str());
708 throw BESInternalError(
"Unable to open cache directory " + d_cache_dir, __FILE__, __LINE__);
711 vector<string> files;
714 while ((dit = readdir(dip)) !=
NULL) {
715 string dirEntry = dit->d_name;
716 if (dirEntry.compare(0, d_prefix.length(), d_prefix) == 0) {
717 files.push_back(d_cache_dir +
"/" + dirEntry);
723 unsigned long long current_size = 0;
725 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
726 if (stat(file->c_str(), &buf) == 0) {
727 current_size += buf.st_size;
730 entry.
size = buf.st_size;
731 entry.
time = buf.st_atime;
735 throw BESInternalError(
"Zero-byte file found in cache. " + *file, __FILE__, __LINE__);
737 contents.push_back(entry);
742 contents.sort(entry_op);
760 BESDEBUG(
"cache",
"purge - starting the purge" << endl);
766 unsigned long long computed_size = m_collect_cache_dir_info(contents);
769 BESDEBUG(
"cache",
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
770 CacheFiles::iterator ti = contents.begin();
771 CacheFiles::iterator te = contents.end();
772 for (; ti != te; ti++) {
773 BESDEBUG(
"cache", (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
777 BESDEBUG(
"cache",
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
784 CacheFiles::iterator i = contents.begin();
785 while (i != contents.end() && computed_size > d_target_size) {
790 if (i->name != new_file && getExclusiveLockNB(i->name, cfile_fd)) {
791 BESDEBUG(
"cache",
"purge: " << i->name <<
" removed." << endl );
793 if (unlink(i->name.c_str()) != 0)
794 throw BESInternalError(
"Unable to purge the file " + i->name +
" from the cache: " + get_errno(), __FILE__, __LINE__);
797 computed_size -= i->size;
801 BESDEBUG(
"cache",
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
806 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
807 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
809 if(write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
810 throw BESInternalError(
"Could not write size info to the cache info file!", __FILE__, __LINE__);
814 computed_size = m_collect_cache_dir_info(contents);
815 BESDEBUG(
"cache",
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
816 CacheFiles::iterator ti = contents.begin();
817 CacheFiles::iterator te = contents.end();
818 for (; ti != te; ti++) {
819 BESDEBUG(
"cache", (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
844 BESDEBUG(
"cache",
"BESFileLockingCache::purge_file() - starting the purge" << endl);
853 unsigned long long size = 0;
855 if (stat(file.c_str(), &buf) == 0) {
859 BESDEBUG(
"cache",
"BESFileLockingCache::purge_file() - " << file <<
" removed." << endl );
861 if (unlink(file.c_str()) != 0)
862 throw BESInternalError(
"Unable to purge the file " + file +
" from the cache: " + get_errno(), __FILE__, __LINE__);
868 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
869 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
871 if (write(d_cache_info_fd, &cache_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
872 throw BESInternalError(
"Could not write size info to the cache info file!", __FILE__, __LINE__);
903 strm <<
BESIndent::LMarg <<
"BESFileLockingCache::dump - (" << (
void *)
this <<
")" << endl;
907 strm <<
BESIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
virtual void unlock_cache()
Unlock the cache info file.
#define BESISDEBUG(x)
macro used to determine if the specified debug context is set
exception thrown if inernal error encountered
virtual bool create_and_lock(const string &target, int &fd)
Create a file in the cache and lock it for write access.
const string getCacheDirectory()
virtual unsigned long long get_cache_size()
Get the cache size.
void initialize(const string &cache_dir, const string &prefix, unsigned long long size)
virtual void dump(ostream &strm) const
dumps information about this object
static ostream & LMarg(ostream &strm)
virtual void purge_file(const string &file)
Purge a single file from the cache.
std::list< cache_entry > CacheFiles
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big...
virtual void lock_cache_write()
Get an exclusive lock on the 'cache info' file.
virtual string get_cache_file_name(const string &src, bool mangle=true)
Build the name of file that will holds the uncompressed data from 'src' in the cache.
const string getCacheFilePrefix()
virtual bool get_read_lock(const string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void update_and_purge(const string &new_file)
Purge files from the cache.
virtual unsigned long long update_cache_info(const string &target)
Update the cache info file to include 'target'.
virtual void lock_cache_read()
Get a shared lock on the 'cache info' file.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
#define BESDEBUG(x, y)
macro used to send debug information to the debug stream
virtual bool getExclusiveLock(string file_name, int &ref_fd)
Get an exclusive read/write lock on an existing file.
virtual void unlock_and_close(const string &target)
Unlock the named file.