32 #ifdef HAVE_TR1_FUNCTIONAL
33 #include <tr1/functional>
41 #include <DapXmlNamespaces.h>
42 #include <ConstraintEvaluator.h>
43 #include <DDXParserSAX2.h>
47 #include <D4EnumDefs.h>
48 #include <D4Dimensions.h>
51 #include <D4ParserSax2.h>
52 #include <XDRStreamMarshaller.h>
53 #include <XDRStreamUnMarshaller.h>
54 #include <chunked_istream.h>
55 #include <D4StreamUnMarshaller.h>
58 #include <mime_util.h>
69 #ifdef HAVE_TR1_FUNCTIONAL
70 #define HASH_OBJ std::tr1::hash
72 #define HASH_OBJ std::hash
76 #define BES_DATA_ROOT "BES.Data.RootDirectory"
77 #define BES_CATALOG_ROOT "BES.Catalog.catalog.RootDirectory"
87 unsigned long BESStoredDapResultCache::getCacheSizeFromConfig(){
91 unsigned long size_in_megabytes = 0;
94 istringstream iss(size);
95 iss >> size_in_megabytes;
98 string msg =
"[ERROR] BESStoredDapResultCache::getCacheSize() - The BES Key " + SIZE_KEY +
" is not set! It MUST be set to utilize the Stored Result Caching system. ";
102 return size_in_megabytes;
105 string BESStoredDapResultCache::getSubDirFromConfig(){
111 string msg =
"[ERROR] BESStoredDapResultCache::getSubDirFromConfig() - The BES Key " + SUBDIR_KEY +
" is not set! It MUST be set to utilize the Stored Result Caching system. ";
116 while(*subdir.begin() ==
'/' && subdir.length()>0){
117 subdir = subdir.substr(1);
127 string BESStoredDapResultCache::getResultPrefixFromConfig(){
135 string msg =
"[ERROR] BESStoredDapResultCache::getResultPrefix() - The BES Key " + PREFIX_KEY +
" is not set! It MUST be set to utilize the Stored Result Caching system. ";
144 string BESStoredDapResultCache::getBesDataRootDirFromConfig(){
146 string cacheDir =
"";
151 string msg = ((string)
"[ERROR] BESStoredDapResultCache::getStoredResultsDir() - Neither the BES Key ") +
BES_CATALOG_ROOT +
152 "or the BES key " +
BES_DATA_ROOT +
" have been set! One MUST be set to utilize the Stored Result Caching system. ";
167 string firstPathFragment = firstPart;
168 string secondPathFragment = secondPart;
172 if(*firstPathFragment.begin() !=
'/')
173 firstPathFragment =
"/" + firstPathFragment;
177 while(*firstPathFragment.rbegin() ==
'/' && firstPathFragment.length()>0){
178 firstPathFragment = firstPathFragment.substr(0,firstPathFragment.length()-1);
183 if(*firstPathFragment.rbegin() !=
'/'){
184 firstPathFragment +=
"/";
189 while(*secondPathFragment.begin() ==
'/' && secondPathFragment.length()>0){
190 secondPathFragment = secondPathFragment.substr(1);
195 string newPath = firstPathFragment + secondPathFragment;
203 BESStoredDapResultCache::BESStoredDapResultCache(){
204 BESDEBUG(
"cache",
"BESStoredDapResultCache::BESStoredDapResultCache() - BEGIN" << endl);
206 d_storedResultsSubdir = getSubDirFromConfig();
207 d_dataRootDir = getBesDataRootDirFromConfig();
208 string resultsDir = assemblePath(d_dataRootDir,d_storedResultsSubdir);
210 d_resultFilePrefix = getResultPrefixFromConfig();
211 d_maxCacheSize = getCacheSizeFromConfig();
213 BESDEBUG(
"cache",
"BESStoredDapResultCache() - Stored results cache configuration params: " << resultsDir <<
", " << d_resultFilePrefix <<
", " << d_maxCacheSize << endl);
215 initialize(resultsDir, d_resultFilePrefix, d_maxCacheSize);
217 BESDEBUG(
"cache",
"BESStoredDapResultCache::BESStoredDapResultCache() - END" << endl);
224 BESStoredDapResultCache::BESStoredDapResultCache(
const string &data_root_dir,
const string &stored_results_subdir,
const string &result_file_prefix,
unsigned long long max_cache_size){
226 d_storedResultsSubdir = stored_results_subdir;
227 d_dataRootDir = data_root_dir;
228 d_resultFilePrefix = result_file_prefix;
229 d_maxCacheSize = max_cache_size;
230 initialize(assemblePath(d_dataRootDir,stored_results_subdir), d_resultFilePrefix, d_maxCacheSize);
237 if (d_instance == 0){
238 if(dir_exists(data_root_dir)){
240 d_instance =
new BESStoredDapResultCache(data_root_dir, stored_results_subdir, result_file_prefix, max_cache_size);
243 BESDEBUG(
"cache",
"[ERROR] BESStoredDapResultCache::get_instance(): Failed to obtain cache! msg: " << bie.
get_message() << endl);
256 if (d_instance == 0) {
261 BESDEBUG(
"cache",
"[ERROR] BESStoredDapResultCache::get_instance(): Failed to obtain cache! msg: " << bie.
get_message() << endl);
269 void BESStoredDapResultCache::delete_instance() {
270 BESDEBUG(
"cache",
"BESStoredDapResultCache::delete_instance() - Deleting singleton BESStoredDapResultCache instance." << endl);
284 bool BESStoredDapResultCache::is_valid(
const string &cache_file_name,
const string &dataset)
289 off_t entry_size = 0;
290 time_t entry_time = 0;
292 if (stat(cache_file_name.c_str(), &buf) == 0) {
293 entry_size = buf.st_size;
294 entry_time = buf.st_mtime;
303 time_t dataset_time = entry_time;
304 if (stat(dataset.c_str(), &buf) == 0) {
305 dataset_time = buf.st_mtime;
313 if (dataset_time > entry_time)
330 bool BESStoredDapResultCache::read_dap2_data_from_cache(
const string &cache_file_name, DDS *fdds)
332 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap2_data_from_cache() - Opening cache file: " << cache_file_name << endl);
337 if (get_read_lock(cache_file_name, fd)) {
339 ifstream data(cache_file_name.c_str());
342 string mime = get_next_mime_header(data);
343 while (!mime.empty()) {
344 mime = get_next_mime_header(data);
348 DDXParser ddx_parser(fdds->get_factory());
351 string boundary = read_multipart_boundary(data);
352 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap2_data_from_cache() - MPM Boundary: " << boundary << endl);
354 read_multipart_headers(data,
"text/xml", dods_ddx);
356 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap2_data_from_cache() - Read the multipart haeaders" << endl);
362 ddx_parser.intern_stream(data, fdds, data_cid, boundary);
363 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap2_data_from_cache() - Dataset name: " << fdds->get_dataset_name() << endl);
366 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap2_data_from_cache() - DDX Parser Error: " << e.get_error_message() << endl);
371 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap2_data_from_cache() - Data CID (before): " << data_cid << endl);
372 data_cid = cid_to_header_value(data_cid);
373 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap2_data_from_cache() - Data CID (after): " << data_cid << endl);
377 read_multipart_headers(data,
"application/octet-stream", dods_data_ddx, data_cid);
382 XDRStreamUnMarshaller um(data);
383 for (DDS::Vars_iter i = fdds->var_begin(); i != fdds->var_end(); i++) {
384 (*i)->deserialize(um, fdds);
388 unlock_and_close(cache_file_name );
392 BESDEBUG(
"cache",
"BESStoredDapResultCache - The requested file does not exist. File: " + cache_file_name);
398 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - caught exception, unlocking cache and re-throw." << endl );
401 unlock_and_close(cache_file_name );
417 bool BESStoredDapResultCache::read_dap4_data_from_cache(
const string &cache_file_name, libdap::DMR *dmr)
419 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - BEGIN" << endl);
425 if (get_read_lock(cache_file_name, fd)) {
426 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - Opening cache file: " << cache_file_name << endl);
427 fstream in(cache_file_name.c_str(), ios::in|ios::binary);
438 chunked_istream cis(in, CHUNK_SIZE);
445 int chunk_size = cis.read_next_chunk();
447 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - First chunk_size: " << chunk_size << endl);
450 if(chunk_size == EOF){
451 throw InternalErr(__FILE__, __LINE__,
"BESStoredDapResultCache::read_dap4_data_from_cache() - Failed to read first chunk from file. Chunk size = EOF (aka " + libdap::long_to_string(EOF) +
")");
455 char chunk[chunk_size];
456 cis.read(chunk, chunk_size);
457 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - Read first chunk." << endl);
462 parser.intern(chunk, chunk_size-2, dmr, debug);
463 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - Parsed first chunk." << endl);
465 D4StreamUnMarshaller um(cis, cis.twiddle_bytes());
467 dmr->root()->deserialize(um, *dmr);
468 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - Deserialized data." << endl);
470 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - END" << endl);
473 unlock_and_close(cache_file_name );
479 BESDEBUG(
"cache",
"BESStoredDapResultCache - The requested file does not exist. File: " + cache_file_name);
486 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - caught exception, unlocking cache and re-throw." << endl );
489 unlock_and_close(cache_file_name );
501 BESDEBUG(
"cache",
"Reading cache for " << cache_file_name << endl);
503 DDS *fdds =
new DDS(factory);
506 if(read_dap2_data_from_cache(cache_file_name, fdds)){
509 fdds->filename(filename) ;
512 BESDEBUG(
"cache",
"DDS Filename: " << fdds->filename() << endl);
513 BESDEBUG(
"cache",
"DDS Dataset name: " << fdds->get_dataset_name() << endl);
515 fdds->set_factory( 0 ) ;
519 DDS::Vars_iter i = fdds->var_begin();
520 while(i != fdds->var_end()) {
521 (*i)->set_read_p(
true );
522 (*i++)->set_send_p(
true);
543 BESDEBUG(
"cache",
"BESStoredDapResultCache::get_cached_dap4_data() - Reading cache for " << cache_file_name << endl);
545 DMR *fdmr =
new DMR(factory);
547 BESDEBUG(
"cache",
"BESStoredDapResultCache::get_cached_dap4_data() - DMR Filename: " << fdmr->filename() << endl);
548 fdmr->set_filename(filename) ;
550 if(read_dap4_data_from_cache(cache_file_name, fdmr)){
551 BESDEBUG(
"cache",
"BESStoredDapResultCache::get_cached_dap4_data() - DMR Dataset name: " << fdmr->name() << endl);
553 fdmr->set_factory( 0 ) ;
557 fdmr->root()->set_send_p(
true);
558 fdmr->root()->set_read_p(
true);
574 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - BEGIN" << endl );
576 BaseTypeFactory factory;
581 string local_id = get_stored_result_local_id(dds.filename(), constraint);
582 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - local_id: "<< local_id << endl );
583 string cache_file_name = get_cache_file_name(local_id,
false);
584 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - cache_file_name: "<< cache_file_name << endl );
591 if (!is_valid(cache_file_name, dds.filename()))
592 purge_file(cache_file_name);
594 if (get_read_lock(cache_file_name, fd)) {
595 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - function ce (change)- cached hit: " << cache_file_name << endl);
596 fdds = get_cached_dap2_data_ddx(cache_file_name, &factory, dds.filename());
598 else if (create_and_lock(cache_file_name, fd)) {
601 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - function ce - caching " << cache_file_name <<
", constraint: " << constraint << endl);
604 eval->parse_constraint(constraint, *fdds);
606 if (eval->function_clauses()) {
607 DDS *temp_fdds = eval->eval_function_clauses(*fdds);
612 ofstream data_stream(cache_file_name.c_str());
614 throw InternalErr(__FILE__, __LINE__,
"Could not open '" + cache_file_name +
"' to write cached response.");
616 string start=
"dataddx_cache_start", boundary=
"dataddx_cache_boundary";
620 ConstraintEvaluator eval;
624 fdds->set_dap_version(
"3.2");
631 set_mime_multipart(data_stream, boundary, start, dods_data_ddx, x_plain, last_modified_time(rb->
get_dataset_name()));
636 data_stream <<
CRLF <<
"--" << boundary <<
"--" <<
CRLF;
643 exclusive_to_shared_lock(fd);
648 unsigned long long size = update_cache_info(cache_file_name);
649 if (cache_too_big(size))
650 update_and_purge(cache_file_name);
654 else if (get_read_lock(cache_file_name, fd)) {
655 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - function ce - cached hit: " << cache_file_name << endl);
656 fdds = get_cached_dap2_data_ddx(cache_file_name, &factory, dds.get_dataset_name());
659 throw InternalErr(__FILE__, __LINE__,
"BESStoredDapResultCache::store_dap2_result() - Cache error during function invocation.");
671 BESDEBUG(
"cache",
"BESStoredDapResultCache::cache_dap2_dataset() - unlocking and closing cache file "<< cache_file_name << endl );
672 unlock_and_close(cache_file_name);
676 BESDEBUG(
"cache",
"BESStoredDapResultCache::cache_dap2_dataset() - caught exception, unlocking cache and re-throw." << endl );
683 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - END (local_id=`"<< local_id <<
"'" << endl );
694 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - BEGIN" << endl );
696 BaseTypeFactory factory;
700 string local_id = get_stored_result_local_id(dds.filename(), constraint, DAP_3_2);
701 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - local_id: "<< local_id << endl );
702 string cache_file_name = get_cache_file_name(local_id,
false);
703 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - cache_file_name: "<< cache_file_name << endl );
710 if (!is_valid(cache_file_name, dds.filename()))
711 purge_file(cache_file_name);
713 if (get_read_lock(cache_file_name, fd)) {
714 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - Stored Result already exists. Not rewriting file: " << cache_file_name << endl);
716 else if (create_and_lock(cache_file_name, fd)) {
719 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - cache_file_name " << cache_file_name <<
", constraint: " << constraint << endl);
723 eval->parse_constraint(constraint, *fdds);
725 if (eval->function_clauses()) {
726 DDS *temp_fdds = eval->eval_function_clauses(*fdds);
731 ofstream data_stream(cache_file_name.c_str());
733 throw InternalErr(__FILE__, __LINE__,
"Could not open '" + cache_file_name +
"' to write cached response.");
735 string start=
"dataddx_cache_start", boundary=
"dataddx_cache_boundary";
739 ConstraintEvaluator eval;
743 fdds->set_dap_version(
"3.2");
750 set_mime_multipart(data_stream, boundary, start, dods_data_ddx, x_plain, last_modified_time(rb->
get_dataset_name()));
755 data_stream << CRLF <<
"--" << boundary <<
"--" <<
CRLF;
762 exclusive_to_shared_lock(fd);
767 unsigned long long size = update_cache_info(cache_file_name);
768 if (cache_too_big(size))
769 update_and_purge(cache_file_name);
777 else if (get_read_lock(cache_file_name, fd)) {
778 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - Stored Result already exists. Not rewriting file: " << cache_file_name << endl);
781 throw InternalErr(__FILE__, __LINE__,
"BESStoredDapResultCache::store_dap2_result() - Cache error during function invocation.");
784 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - unlocking and closing cache file "<< cache_file_name << endl );
785 unlock_and_close(cache_file_name);
788 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - caught exception, unlocking cache and re-throw." << endl );
794 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - END (local_id=`"<< local_id <<
"')" << endl );
806 BESStoredDapResultCache::get_stored_result_local_id(
const string &dataset,
const string &ce, libdap::DAPVersion version )
808 BESDEBUG(
"cache",
"get_stored_result_local_id() - BEGIN. dataset: " << dataset <<
", ce: " << ce << endl);
809 std::ostringstream ostr;
810 HASH_OBJ<std::string> str_hash;
811 string name = dataset +
"#" + ce;
812 ostr << str_hash(name);
813 string hashed_name = ostr.str();
814 BESDEBUG(
"cache",
"get_stored_result_local_id() - hashed_name: " << hashed_name << endl);
823 suffix =
".data_ddx";
830 throw BESInternalError(
"BESStoredDapResultCache::get_stored_result_local_id() - Unrecognized DAP version!!", __FILE__, __LINE__);
834 BESDEBUG(
"cache",
"get_stored_result_local_id() - Data file suffix: " << suffix << endl);
837 string local_id = d_resultFilePrefix + hashed_name + suffix;
838 BESDEBUG(
"cache",
"get_stored_result_local_id() - file: " << local_id << endl);
840 local_id = assemblePath(d_storedResultsSubdir,local_id);
842 BESDEBUG(
"cache",
"get_stored_result_local_id() - END. local_id: " << local_id << endl);
863 if(local_id.empty()){
864 throw BESInternalError(
"BESStoredDapResultCache: The target cache file name must not be an empty string. Srsly.", __FILE__, __LINE__);
867 string cacheFile = assemblePath(d_dataRootDir,local_id);
869 BESDEBUG(
"cache",
"BESStoredDapResultCache::get_cache_file_name() - local_id: '" << local_id <<
"'" << endl);
870 BESDEBUG(
"cache",
"BESStoredDapResultCache::get_cache_file_name() - cacheDir: '" << cacheFile <<
"'" << endl);
873 BESDEBUG(
"cache",
"[WARNING] BESStoredDapResultCache::get_cache_file_name() - The parameter 'mangle' is ignored!" << endl);
886 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - BEGIN" << endl );
888 BaseTypeFactory factory;
892 string local_id = get_stored_result_local_id(dmr.filename(), constraint, DAP_4_0);
893 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - local_id: "<< local_id << endl );
894 string cache_file_name = get_cache_file_name(local_id,
false);
895 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - cache_file_name: "<< cache_file_name << endl );
902 if (!is_valid(cache_file_name, dmr.filename())){
903 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - File is not valid. Purging file from cache. filename: " << cache_file_name << endl);
904 purge_file(cache_file_name);
907 if (get_read_lock(cache_file_name, fd)) {
908 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - Stored Result already exists. Not rewriting file: " << cache_file_name << endl);
910 else if (create_and_lock(cache_file_name, fd)) {
913 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - cache_file_name: " << cache_file_name <<
", constraint: " << constraint << endl);
915 ofstream data_stream(cache_file_name.c_str());
917 throw InternalErr(__FILE__, __LINE__,
"Could not open '" + cache_file_name +
"' to write cached response.");
928 exclusive_to_shared_lock(fd);
933 unsigned long long size = update_cache_info(cache_file_name);
934 if (cache_too_big(size))
935 update_and_purge(cache_file_name);
939 else if (get_read_lock(cache_file_name, fd)) {
940 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - Couldn't create and lock file, But I got a read lock. "
941 "Result may have been created by another process. "
942 "Not rewriting file: " << cache_file_name << endl);
945 throw InternalErr(__FILE__, __LINE__,
"BESStoredDapResultCache::store_dap4_result() - Cache error during function invocation.");
953 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - unlocking and closing cache file "<< cache_file_name << endl );
954 unlock_and_close(cache_file_name);
958 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - caught exception, unlocking cache and re-throw." << endl );
965 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - END (local_id=`"<< local_id <<
"')" << endl );
exception thrown if inernal error encountered
static string lowercase(const string &s)
Convert a string to all lower case.
static string assemblePath(const string &firstPart, const string &secondPart, bool addLeadingSlash=false)
virtual string get_cache_file_name(const string &src, bool mangle=false)
Build the name of file that will holds the uncompressed data from 'src' in the cache.
static const string SIZE_KEY
virtual string store_dap4_result(libdap::DMR &dmr, const string &constraint, BESDapResponseBuilder *rb)
libdap::DMR * get_cached_dap4_data(const string &cache_file_name, libdap::D4BaseTypeFactory *factory, const string &filename)
Read data from cache.
static class NCMLUtil overview
virtual string get_message()
get the error message for this exception
static BESStoredDapResultCache * get_instance()
Get the default instance of the BESStoredDapResultCache object.
libdap::DDS * get_cached_dap2_data_ddx(const std::string &cache_file_name, libdap::BaseTypeFactory *factory, const std::string &dataset)
Read data from cache.
static const string SUBDIR_KEY
static bool IsSet(const string &flagName)
see if the debug context flagName is set to true
static const string PREFIX_KEY
void get_value(const string &s, string &val, bool &found)
Retrieve the value of a given key, if set.
virtual void serialize_dap2_data_ddx(std::ostream &out, libdap::DDS &dds, libdap::ConstraintEvaluator &eval, const std::string &boundary, const std::string &start, bool ce_eval=true)
Serialize a DAP3.2 DataDDX to the stream "out".
This class is used to build responses for/by the BES.
#define BESDEBUG(x, y)
macro used to send debug information to the debug stream
static BESKeys * TheKeys()
virtual std::string get_dataset_name() const
The ``dataset name'' is the filename or other string that the filter program will use to access the d...
This class is used to cache DAP2 response objects.
virtual string store_dap2_result(libdap::DDS &dds, const std::string &constraint, BESDapResponseBuilder *rb, libdap::ConstraintEvaluator *eval)
virtual void serialize_dap4_data(std::ostream &out, libdap::DMR &dmr, bool with_mime_headers=true)
Serialize the DAP4 data response to the passed stream.