osgEarth 2.1.1

/home/cube/sources/osgearth/src/osgEarthDrivers/cache_sqlite3/Sqlite3Cache.cpp

Go to the documentation of this file.
00001 /* -*-c++-*- */
00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph
00003  * Copyright 2008-2010 Pelican Mapping
00004  * http://osgearth.org
00005  *
00006  * osgEarth is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU Lesser General Public License as published by
00008  * the Free Software Foundation; either version 2 of the License, or
00009  * (at your option) any later version.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU Lesser General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU Lesser General Public License
00017  * along with this program.  If not, see <http://www.gnu.org/licenses/>
00018  */
00019 #include "Sqlite3CacheOptions"
00020 
00021 #include <osgEarth/FileUtils>
00022 #include <osgEarth/TaskService>
00023 #include <osgDB/FileNameUtils>
00024 #include <osgDB/FileUtils>
00025 #include <osgDB/ReaderWriter>
00026 #if OSG_MIN_VERSION_REQUIRED(2,9,5)
00027 #  include <osgDB/Options>
00028 #endif
00029 #include <OpenThreads/Mutex>
00030 #include <OpenThreads/ScopedLock>
00031 #include <cstring>
00032 #include <fstream>
00033 
00034 // for the compressor stuff
00035 #if OSG_MIN_VERSION_REQUIRED(2,9,8)
00036 #  define USE_SERIALIZERS
00037 #  include <osgDB/Serializer>
00038 #endif
00039 
00040 #include <sqlite3.h>
00041 
00042 using namespace osgEarth;
00043 using namespace osgEarth::Drivers;
00044 using namespace OpenThreads;
00045 
00046 #define LC "[Sqlite3Cache] "
00047 
00048 #define USE_TRANSACTIONS
00049 #define USE_L2_CACHE
00050 
00051 //#define SPLIT_DB_FILE
00052 //#define SPLIT_LAYER_DB
00053 #define UPDATE_ACCESS_TIMES
00054 #define UPDATE_ACCESS_TIMES_POOL
00055 #define MAX_REQUEST_TO_RUN_PURGE 100
00056 
00057 #define PURGE_GENERAL
00058 //#define INSERT_POOL
00059 
00060 //#define MONITOR_THREAD_HEALTH
00061 
00062 // --------------------------------------------------------------------------
00063 
00064 // opens a database connection with default settings.
00065 static
00066 sqlite3* openDatabase( const std::string& path, bool serialized )
00067 {
00068     //Try to create the path if it doesn't exist
00069     std::string dirPath = osgDB::getFilePath(path);    
00070 
00071     //If the path doesn't currently exist or we can't create the path, don't cache the file
00072     if (!osgDB::fileExists(dirPath) && !osgDB::makeDirectory(dirPath))
00073     {
00074         OE_WARN << LC << "Couldn't create path " << dirPath << std::endl;
00075     }
00076 
00077     sqlite3* db = 0L;
00078 
00079     // not sure if SHAREDCACHE is necessary or wise 
00080     int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
00081     flags |= serialized ? SQLITE_OPEN_FULLMUTEX : SQLITE_OPEN_NOMUTEX;
00082 
00083     int rc = sqlite3_open_v2( path.c_str(), &db, flags, 0L );
00084 
00085     if ( rc != 0 )
00086     {
00087         OE_WARN << LC << "Failed to open cache \"" << path << "\": " << sqlite3_errmsg(db) << std::endl;
00088         return 0L;
00089     }
00090 
00091     // make sure that writes actually finish
00092     sqlite3_busy_timeout( db, 60000 );
00093 
00094     return db;
00095 }
00096 
00097 // --------------------------------------------------------------------------
00098 
00099 // a slightly customized Cache class that will support asynchronous writes
00100 struct AsyncCache : public Cache
00101 {
00102 public:
00103     AsyncCache(const CacheOptions& options =CacheOptions()): Cache(options) { }
00104     virtual void setImageSync(
00105         const TileKey& key,
00106         const CacheSpec& spec,
00107         const osg::Image* image ) =0;
00108 };
00109 
00110 
00111 // --------------------------------------------------------------------------
00112 
00113 struct MetadataRecord
00114 {
00115     std::string   _layerName;
00116     std::string   _format;
00117     int           _tileSize;
00118     osg::ref_ptr<const Profile> _profile;
00119     std::string   _compressor;
00120 };
00121 
00125 struct MetadataTable
00126 {
00127     MetadataTable() { }
00128 
00129     bool initialize( sqlite3* db )
00130     {
00131         std::string sql =
00132             "CREATE TABLE IF NOT EXISTS metadata ("
00133             "layer varchar(255) PRIMARY KEY UNIQUE, "
00134             "format varchar(255), "
00135             "compressor varchar(64), "
00136             "tilesize int, "
00137             "srs varchar(1024), "
00138             "xmin double, "
00139             "ymin double, "
00140             "xmax double, "
00141             "ymax double, "
00142             "tw int, "
00143             "th int )";
00144 
00145         OE_DEBUG << LC << "SQL = " << sql << std::endl;
00146 
00147         char* errMsg = 0L;
00148         int err = sqlite3_exec( db, sql.c_str(), 0L, 0L, &errMsg );
00149         if ( err != SQLITE_OK )
00150         {
00151             OE_WARN << LC << "[Sqlite3Cache] Creating metadata: " << errMsg << std::endl;
00152             sqlite3_free( errMsg );
00153             return false;
00154         }
00155 
00156         // prep the insert/select statement SQL strings:
00157         _insertSQL = 
00158             "INSERT OR REPLACE INTO metadata "
00159             "(layer,format,compressor,tilesize,srs,xmin,ymin,xmax,ymax,tw,th) "
00160             "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
00161 
00162         _selectSQL =
00163             "SELECT layer,format,compressor,tilesize,srs,xmin,ymin,xmax,ymax,tw,th "
00164             "FROM metadata WHERE layer = ?";
00165 
00166         return true;
00167     }
00168 
00169     bool store( const MetadataRecord& rec, sqlite3* db )
00170     {
00171         sqlite3_stmt* insert = 0;
00172         int rc = sqlite3_prepare_v2( db, _insertSQL.c_str(), _insertSQL.length(), &insert, 0L );
00173         if ( rc != SQLITE_OK )
00174         {
00175             OE_WARN 
00176                 << LC << "Error preparing SQL: " 
00177                 << sqlite3_errmsg( db )
00178                 << "(SQL: " << _insertSQL << ")"
00179                 << std::endl;
00180             return false;
00181         }
00182 
00183         sqlite3_bind_text( insert, 1, rec._layerName.c_str(), -1, 0L );
00184         sqlite3_bind_text( insert, 2, rec._format.c_str(), -1, 0L );
00185         sqlite3_bind_text( insert, 3, rec._compressor.c_str(), -1, 0L );
00186         sqlite3_bind_int ( insert, 4, rec._tileSize );
00187         sqlite3_bind_text( insert, 5, rec._profile->getSRS()->getInitString().c_str(), -1, 0L );
00188         sqlite3_bind_double( insert, 6, rec._profile->getExtent().xMin() );
00189         sqlite3_bind_double( insert, 7, rec._profile->getExtent().yMin() );
00190         sqlite3_bind_double( insert, 8, rec._profile->getExtent().xMax() );
00191         sqlite3_bind_double( insert, 9, rec._profile->getExtent().yMax() );
00192         unsigned int tw, th;
00193         rec._profile->getNumTiles( 0, tw, th );
00194         sqlite3_bind_int( insert, 10, tw );
00195         sqlite3_bind_int( insert, 11, th );
00196 
00197         bool success;
00198 
00199         rc = sqlite3_step(insert);
00200         if ( rc != SQLITE_DONE )
00201         {
00202             OE_WARN << LC << "SQL INSERT failed: " << sqlite3_errmsg( db )
00203                 << "; SQL = " << _insertSQL
00204                 << std::endl;
00205             success = false;
00206         }
00207         else
00208         {
00209             OE_DEBUG << LC << "Stored metadata record for \"" << rec._layerName << "\"" << std::endl;
00210             success = true;
00211         }
00212 
00213         sqlite3_finalize( insert );
00214         return success;
00215     }
00216 
00217     bool load( const std::string& key, sqlite3* db, MetadataRecord& output )
00218     {
00219         bool success = true;
00220 
00221         sqlite3_stmt* select = 0L;
00222         int rc = sqlite3_prepare_v2( db, _selectSQL.c_str(), _selectSQL.length(), &select, 0L );
00223         if ( rc != SQLITE_OK )
00224         {
00225             OE_WARN 
00226                 << LC << "Error preparing SQL: " 
00227                 << sqlite3_errmsg( db )
00228                 << "(SQL: " << _insertSQL << ")"
00229                 << std::endl;
00230             return false;
00231         }
00232 
00233         sqlite3_bind_text( select, 1, key.c_str(), -1, 0L );
00234 
00235         rc = sqlite3_step( select );
00236         if ( rc == SQLITE_ROW ) 
00237         {
00238             // got a result            
00239             output._layerName = (char*)sqlite3_column_text( select, 0 );
00240             output._format = (char*)sqlite3_column_text( select, 1 );
00241             output._compressor = (char*)sqlite3_column_text( select, 2 );
00242             output._tileSize = sqlite3_column_int( select, 3 );
00243             ProfileOptions pconf;
00244             pconf.srsString() = (char*)sqlite3_column_text( select, 4 );
00245             pconf.bounds() = Bounds(
00246                 sqlite3_column_double( select, 5 ),
00247                 sqlite3_column_double( select, 6 ),
00248                 sqlite3_column_double( select, 7 ),
00249                 sqlite3_column_double( select, 8 ) );
00250             pconf.numTilesWideAtLod0() = sqlite3_column_int( select, 9 );
00251             pconf.numTilesHighAtLod0() = sqlite3_column_int( select, 10 );
00252             output._profile = Profile::create( pconf );
00253             success = true;
00254         }
00255         else
00256         {
00257             // no result
00258             OE_DEBUG << "NO metadata record found for \"" << key << "\"" << std::endl;
00259             success = false;
00260         }
00261 
00262         sqlite3_finalize( select );
00263         return success;
00264     }
00265 
00266 
00267     bool loadAllLayers( sqlite3* db, std::vector<std::string>& output )
00268     {
00269         bool success = true;
00270 
00271         sqlite3_stmt* select = 0L;
00272         std::string selectLayersSQL = "select layer from \"metadata\"";
00273         int rc = sqlite3_prepare_v2( db, selectLayersSQL.c_str(), selectLayersSQL.length(), &select, 0L );
00274         if ( rc != SQLITE_OK )
00275         {
00276             OE_WARN 
00277                 << LC << "Error preparing SQL: " 
00278                 << sqlite3_errmsg( db )
00279                 << "(SQL: " << _insertSQL << ")"
00280                 << std::endl;
00281             return false;
00282         }
00283 
00284         success = true;
00285         rc = sqlite3_step( select );
00286         while (rc == SQLITE_ROW) {
00287             output.push_back((char*)sqlite3_column_text( select, 0 ));
00288             rc = sqlite3_step( select );
00289         }
00290 
00291         if (rc != SQLITE_DONE)
00292         {
00293             // no result
00294             OE_WARN << "NO layers found in metadata" << std::endl;
00295             success = false;
00296         }
00297 
00298         sqlite3_finalize( select );
00299         return success;
00300     }
00301 
00302     std::string _insertSQL;
00303     std::string _selectSQL;
00304  };
00305 
00306 // --------------------------------------------------------------------------
00307 
00308 struct ImageRecord
00309 {
00310     ImageRecord( const TileKey& key ) : _key(key) { }
00311     TileKey _key;
00312     int _created;
00313     int _accessed;
00314     osg::ref_ptr<const osg::Image> _image;
00315 };
00316 
00317 #ifdef INSERT_POOL
00318 class Sqlite3Cache;
00319 struct AsyncInsertPool : public TaskRequest {
00320     struct Entry {
00321         TileKey _key;
00322         osg::ref_ptr<osg::Image> _image;
00323         std::string _format;
00324         Entry() {}
00325         Entry(const TileKey& key, const std::string& format, osg::Image* img) : _key(key), _format(format), _image(img) {}
00326     };
00327 
00328     typedef std::map<std::string, Entry> PoolContainer;
00329 
00330     AsyncInsertPool(const std::string& layerName, Sqlite3Cache* cache );
00331 
00332     void addEntry( const TileKey& key, const std::string& format, osg::Image* image)
00333     {
00334         const std::string& keyStr = key.str();
00335         if (_pool.find(keyStr) != _pool.end())
00336             return;
00337         _pool[keyStr] = Entry(key, format, image);
00338     }
00339 
00340     osg::Image* findImage(const std::string& key)
00341     {
00342         PoolContainer::iterator it = _pool.find(key);
00343         if (it != _pool.end()) {
00344             return it->second._image.get();
00345         }
00346         return 0;
00347     }
00348 
00349     void operator()( ProgressCallback* progress );
00350     
00351     PoolContainer _pool;
00352     std::string _layerName;
00353     osg::observer_ptr<Sqlite3Cache> _cache;
00354 };
00355 #endif
00356 
00361 struct LayerTable : public osg::Referenced
00362 {
00363     LayerTable( const MetadataRecord& meta, sqlite3* db )
00364         : _meta(meta)
00365     {        
00366         _tableName = "layer_" + _meta._layerName;
00367         // create the table and load the processors.
00368         if ( ! initialize( db ) )
00369         {
00370             return;
00371         }
00372 
00373         // initialize the SELECT statement for fetching records
00374         std::stringstream buf;
00375 #ifdef SPLIT_DB_FILE
00376         buf << "SELECT created,accessed,size FROM \"" << tableName << "\" WHERE key = ?";
00377 #else
00378         buf << "SELECT created,accessed,data FROM \"" << _tableName << "\" WHERE key = ?";
00379 #endif
00380         _selectSQL = buf.str();
00381 
00382         // initialize the UPDATE statement for updating the timestamp of an accessed record
00383         buf.str("");
00384         buf << "UPDATE \"" << _tableName << "\" SET accessed = ? "
00385             << "WHERE key = ?";
00386         _updateTimeSQL = buf.str();
00387 
00388         buf.str("");
00389         buf << "UPDATE \"" << _tableName << "\" SET accessed = ? "
00390             << "WHERE key in ( ? )";
00391         _updateTimePoolSQL = buf.str();
00392         
00393         // initialize the INSERT statement for writing records.
00394         buf.str("");
00395         buf << "INSERT OR REPLACE INTO \"" << _tableName << "\" "
00396 #ifdef SPLIT_DB_FILE
00397             << "(key,created,accessed,size) VALUES (?,?,?,?)";
00398 #else
00399             << "(key,created,accessed,data) VALUES (?,?,?,?)";
00400 #endif
00401         _insertSQL = buf.str();
00402 
00403         // initialize the DELETE statements for purging old records.
00404         buf.str("");
00405         buf << "DELETE FROM \"" << _tableName << "\" "
00406             << "INDEXED BY \"" << _tableName << "_lruindex\" "
00407             << "WHERE accessed < ?";
00408         _purgeSQL = buf.str();
00409         
00410         buf.str("");
00411         buf << "DELETE FROM \""  << _tableName << "\" WHERE key in (SELECT key FROM \"" << _tableName << "\" WHERE \"accessed\" < ? limit ?)";
00412         _purgeLimitSQL = buf.str();          
00413 
00414         buf.str("");
00415         buf << "SELECT key FROM \"" << _tableName << "\" WHERE \"accessed\" < ? limit ?";
00416         _purgeSelect = buf.str();
00417 
00418         _statsLoaded = 0;
00419         _statsStored = 0;
00420         _statsDeleted = 0;
00421     }
00422 
00423 
00424     sqlite3_int64 getTableSize(sqlite3* db)
00425     {
00426 #ifdef SPLIT_DB_FILE
00427         std::string query = "select sum(size) from \"" + _tableName + "\";";
00428 #else
00429         std::string query = "select sum(length(data)) from \"" + _tableName + "\";";
00430 #endif
00431         sqlite3_stmt* select = 0L;
00432         int rc = sqlite3_prepare_v2( db, query.c_str(), query.length(), &select, 0L );
00433         if ( rc != SQLITE_OK )
00434         {
00435             OE_WARN << LC << "Failed to prepare SQL: " << query << "; " << sqlite3_errmsg(db) << std::endl;
00436             return -1;
00437         }
00438 
00439         rc = sqlite3_step( select );
00440         if ( rc != SQLITE_ROW)
00441         {
00442             OE_WARN << LC << "SQL QUERY failed for " << query << ": " 
00443                 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries)
00444                 << ", rc = " << rc << std::endl;
00445             sqlite3_finalize( select );
00446             return -1;
00447         }
00448         sqlite3_int64 size = sqlite3_column_int(select, 0);
00449         sqlite3_finalize( select );
00450         return size;
00451     }
00452 
00453     int getNbEntry(sqlite3* db) {
00454         std::string query = "select count(*) from \"" + _tableName + "\";";
00455         sqlite3_stmt* select = 0L;
00456         int rc = sqlite3_prepare_v2( db, query.c_str(), query.length(), &select, 0L );
00457         if ( rc != SQLITE_OK )
00458         {
00459             OE_WARN << LC << "Failed to prepare SQL: " << query << "; " << sqlite3_errmsg(db) << std::endl;
00460             return -1;
00461         }
00462 
00463         rc = sqlite3_step( select );
00464         if ( rc != SQLITE_ROW)
00465         {
00466             OE_WARN << LC << "SQL QUERY failed for " << query << ": " 
00467                 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries)
00468                 << ", rc = " << rc << std::endl;
00469             sqlite3_finalize( select );
00470             return -1;
00471         }
00472         int nbItems = sqlite3_column_int(select, 0);
00473         sqlite3_finalize( select );
00474         return nbItems;
00475     }
00476 
00477     void checkAndPurgeIfNeeded(sqlite3* db, unsigned int maxSize )
00478     {
00479         int size = getTableSize(db);
00480         OE_DEBUG << _meta._layerName <<  std::dec << " : "  << size/1024/1024 << " MB" << std::endl;
00481         if (size < 0 || size < 1.2 * maxSize)
00482             return;
00483             
00484         ::time_t t = ::time(0L);
00485         int nbElements = getNbEntry(db);
00486         float averageSize = size * 1.0 / nbElements;
00487         float diffSize = size - maxSize;
00488         int maxElementToRemove = static_cast<int>(ceil(diffSize/averageSize));
00489         OE_DEBUG << _meta._layerName <<  " try to remove " << std::dec << maxElementToRemove << " / " <<  nbElements << " to save place" << std::endl;
00490         purge(t, maxElementToRemove, db);
00491     }
00492 
00493     bool store( const ImageRecord& rec, sqlite3* db )
00494     {
00495         displayStats();
00496 
00497         sqlite3_stmt* insert = 0L;
00498         int rc = sqlite3_prepare_v2( db, _insertSQL.c_str(), _insertSQL.length(), &insert, 0L );
00499         if ( rc != SQLITE_OK )
00500         {
00501             OE_WARN 
00502                 << LC << "Error preparing SQL: " 
00503                 << sqlite3_errmsg( db )
00504                 << "(SQL: " << _insertSQL << ")"
00505                 << std::endl;
00506             return false;
00507         }
00508 
00509         // bind the key string:
00510         std::string keyStr = rec._key.str();
00511         sqlite3_bind_text( insert, 1, keyStr.c_str(), keyStr.length(), SQLITE_STATIC );
00512         sqlite3_bind_int(  insert, 2, rec._created );
00513         sqlite3_bind_int(  insert, 3, rec._accessed );
00514 
00515         // serialize the image:
00516 #ifdef SPLIT_DB_FILE
00517         std::stringstream outStream;
00518         _rw->writeImage( *rec._image.get(), outStream, _rwOptions.get() );
00519         std::string outBuf = outStream.str();
00520         std::string fname = _meta._layerName + "_" + keyStr+".osgb";
00521         {
00522             std::ofstream file(fname.c_str(), std::ios::out | std::ios::binary);
00523             if (file.is_open()) {
00524                 file.write(outBuf.c_str(), outBuf.length());
00525             }
00526         }
00527         sqlite3_bind_int( insert, 4, outBuf.length() );
00528 #else
00529         std::stringstream outStream;
00530         _rw->writeImage( *rec._image.get(), outStream, _rwOptions.get() );
00531         std::string outBuf = outStream.str();
00532         sqlite3_bind_blob( insert, 4, outBuf.c_str(), outBuf.length(), SQLITE_STATIC );
00533 #endif
00534 
00535         // write to the database:
00536         rc = sqlite3_step( insert );
00537 
00538         if ( rc != SQLITE_DONE )
00539         {
00540             OE_WARN << LC << "SQL INSERT failed for key " << rec._key.str() << ": " 
00541                 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries)
00542                 << ", rc = " << rc << std::endl;
00543             sqlite3_finalize( insert );
00544             return false;
00545         }
00546         else
00547         {
00548             OE_DEBUG << LC << "cache INSERT tile " << rec._key.str() << std::endl;
00549             sqlite3_finalize( insert );
00550             _statsStored++;
00551             return true;
00552         }
00553     }
00554 
00555 #ifdef INSERT_POOL
00556     bool beginStore(sqlite3* db, sqlite3_stmt*& insert) 
00557     {
00558         int rc;
00559         rc = sqlite3_exec( db, "BEGIN TRANSACTION", NULL, NULL, NULL);
00560         if (rc != 0) {
00561             OE_WARN << LC << "Failed to begin transaction batch of insert for " << _meta._layerName << " "  << sqlite3_errmsg(db) << std::endl;
00562             return false;
00563         }
00564 
00565         // prepare for multiple inserts
00566         rc = sqlite3_prepare_v2( db, _insertSQL.c_str(), _insertSQL.length(), &insert, 0L );
00567         if ( rc != SQLITE_OK )
00568         {
00569             OE_WARN 
00570                 << LC << "Error preparing SQL: " 
00571                 << sqlite3_errmsg( db )
00572                 << "(SQL: " << _insertSQL << ")"
00573                 << std::endl;
00574             return false;
00575         }
00576         return true;
00577     }
00578 
00579     bool storePool(sqlite3* db, const AsyncInsertPool::PoolContainer& entries, sqlite3_stmt* insert)
00580     {
00581         ::time_t t = ::time(0L);
00582         sqlite3_bind_int(  insert, 2, t );
00583         sqlite3_bind_int(  insert, 3, t );
00584         int rc;
00585         for (AsyncInsertPool::PoolContainer::const_iterator it = entries.begin(); it != entries.end(); ++it) {
00586             
00587             // bind the key string:
00588             std::string keyStr = it->first;
00589             sqlite3_bind_text( insert, 1, keyStr.c_str(), keyStr.length(), SQLITE_STATIC );
00590 
00591             // serialize the image:
00592 #ifdef SPLIT_DB_FILE
00593             std::stringstream outStream;
00594             _rw->writeImage( * (it)->second._image.get(), outStream, _rwOptions.get() );
00595             std::string outBuf = outStream.str();
00596             std::string fname = _meta._layerName + "_" + keyStr+".osgb";
00597             {
00598                 std::ofstream file(fname.c_str(), std::ios::out | std::ios::binary);
00599                 if (file.is_open()) {
00600                     file.write(outBuf.c_str(), outBuf.length());
00601                 }
00602             }
00603             sqlite3_bind_int( insert, 4, outBuf.length() );
00604 #else
00605             std::stringstream outStream;
00606             _rw->writeImage( * (it)->second._image.get(), outStream, _rwOptions.get() );
00607             std::string outBuf = outStream.str();
00608             sqlite3_bind_blob( insert, 4, outBuf.c_str(), outBuf.length(), SQLITE_STATIC );
00609 #endif
00610             rc = sqlite3_step(insert);   // executes the INSERT
00611             if ( rc != SQLITE_DONE )
00612             {
00613                 OE_WARN << LC << "SQL INSERT failed for key " << keyStr << ": " 
00614                         << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries)
00615                         << ", rc = " << rc << std::endl;
00616                 sqlite3_finalize(insert); // clean up prepared statement
00617                 return false;
00618             }
00619             sqlite3_reset(insert);  // reset the statement 
00620         }
00621 
00622         _statsStored += entries.size();
00623         return true;
00624     }
00625 
00626     bool endStore(sqlite3* db, sqlite3_stmt* insert)
00627     {
00628         int rc = 0;
00629         rc = sqlite3_finalize(insert); // clean up prepared statement
00630         rc = sqlite3_exec( db, "COMMIT", NULL, NULL, NULL);  // COMMIT all dirty pages at once
00631         if (rc != 0) {
00632             OE_WARN << LC << "Failed to commit batch of insert for " << _meta._layerName << " "  << sqlite3_errmsg(db) << std::endl;
00633             return false;
00634         }
00635         
00636         return true;
00637     }
00638 #endif
00639 
00640     bool updateAccessTime( const TileKey& key, int newTimestamp, sqlite3* db )
00641     { 
00642         sqlite3_stmt* update = 0L;
00643         int rc = sqlite3_prepare_v2( db, _updateTimeSQL.c_str(), _updateTimeSQL.length(), &update, 0L );
00644         if ( rc != SQLITE_OK )
00645         {
00646             OE_WARN << LC << "Failed to prepare SQL " << _updateTimeSQL << "; " << sqlite3_errmsg(db) << std::endl;
00647             return false;
00648         }
00649 
00650         bool success = true;
00651         sqlite3_bind_int( update, 1, newTimestamp );
00652         std::string keyStr = key.str();
00653         sqlite3_bind_text( update, 2, keyStr.c_str(), keyStr.length(), SQLITE_STATIC );
00654         rc = sqlite3_step( update );
00655         if ( rc != SQLITE_DONE )
00656         {
00657             OE_WARN << LC << "Failed to update timestamp for " << key.str() << " on layer " << _meta._layerName << " rc = " << rc << std::endl;
00658             success = false;
00659         }
00660 
00661         sqlite3_finalize( update );
00662         return success;
00663     }
00664 
00665     bool updateAccessTimePool( const std::string&  keyStr, int newTimestamp, sqlite3* db )
00666     {
00667         //OE_WARN << LC << "update access times " << _meta._layerName << " " << keyStr << std::endl;
00668         sqlite3_stmt* update = 0L;
00669         int rc = sqlite3_prepare_v2( db, _updateTimePoolSQL.c_str(), _updateTimePoolSQL.length(), &update, 0L );
00670         if ( rc != SQLITE_OK )
00671         {
00672             OE_WARN << LC << "Failed to prepare SQL " << _updateTimePoolSQL << "; " << sqlite3_errmsg(db) << std::endl;
00673             return false;
00674         }
00675 
00676         bool success = true;
00677         sqlite3_bind_int( update, 1, newTimestamp );
00678         sqlite3_bind_text( update, 2, keyStr.c_str(), keyStr.length(), SQLITE_STATIC );
00679         rc = sqlite3_step( update );
00680         if ( rc != SQLITE_DONE )
00681         {
00682             OE_WARN << LC << "Failed to update timestamp for " << keyStr << " on layer " << _meta._layerName << " rc = " << rc << std::endl;
00683             success = false;
00684         }
00685 
00686         sqlite3_finalize( update );
00687         return success;
00688     }
00689 
00690     bool load( const TileKey& key, ImageRecord& output, sqlite3* db )
00691     {
00692         displayStats();
00693         int imageBufLen = 0;
00694         
00695         sqlite3_stmt* select = 0L;
00696         int rc = sqlite3_prepare_v2( db, _selectSQL.c_str(), _selectSQL.length(), &select, 0L );
00697         if ( rc != SQLITE_OK )
00698         {
00699             OE_WARN << LC << "Failed to prepare SQL: " << _selectSQL << "; " << sqlite3_errmsg(db) << std::endl;
00700             return false;
00701         }
00702 
00703         std::string keyStr = key.str();
00704         sqlite3_bind_text( select, 1, keyStr.c_str(), keyStr.length(), SQLITE_STATIC );
00705 
00706         rc = sqlite3_step( select );
00707         if ( rc != SQLITE_ROW ) // == SQLITE_DONE ) // SQLITE_DONE means "no more rows"
00708         {
00709             // cache miss
00710             OE_DEBUG << LC << "Cache MISS on tile " << key.str() << std::endl;
00711             sqlite3_finalize(select);
00712             return false;
00713         }
00714 
00715         // copy the timestamps:
00716         output._created  = sqlite3_column_int( select, 0 );
00717         output._accessed = sqlite3_column_int( select, 1 );
00718 
00719 #ifdef SPLIT_DB_FILE
00720         std::string fname(keyStr);
00721         osgDB::ReaderWriter::ReadResult rr = _rw->readImage( _meta._layerName + "_" +fname+".osgb" );
00722 #else
00723         // the pointer returned from _blob gets freed internally by sqlite, supposedly
00724         const char* data = (const char*)sqlite3_column_blob( select, 2 );
00725         imageBufLen = sqlite3_column_bytes( select, 2 );
00726 
00727         // deserialize the image from the buffer:
00728         std::string imageString( data, imageBufLen );
00729         std::stringstream imageBufStream( imageString );
00730         osgDB::ReaderWriter::ReadResult rr = _rw->readImage( imageBufStream );
00731 #endif
00732         if ( rr.error() )
00733         {
00734             OE_WARN << LC << "Failed to read image from database: " << rr.message() << std::endl;
00735         }
00736         else
00737         {
00738             output._image = rr.takeImage();
00739             output._key = key;
00740             OE_DEBUG << LC << "Cache HIT on tile " << key.str() << std::endl;
00741         }
00742 
00743         sqlite3_finalize(select);
00744 
00745         _statsLoaded++;
00746         return output._image.valid();
00747     }
00748 
00749     void displayStats()
00750     {
00751         osg::Timer_t t = osg::Timer::instance()->tick();
00752         if (osg::Timer::instance()->delta_s( _statsLastCheck, t) > 10.0) {
00753             double d = osg::Timer::instance()->delta_s(_statsStartTimer, t);
00754             OE_DEBUG << _meta._layerName << " time " << d << " stored " << std::dec << _statsStored << " rate " << _statsStored * 1.0 / d << std::endl;
00755             OE_DEBUG << _meta._layerName << " time " << d << " loaded " << std::dec  << _statsLoaded << " rate " << _statsLoaded * 1.0 / d << std::endl;
00756             OE_DEBUG << _meta._layerName << " time " << d << " deleted " << std::dec  << _statsDeleted << " rate " << _statsDeleted * 1.0 / d << std::endl;
00757             _statsLastCheck = t;
00758         }
00759     }
00760 
00761     bool purge( int utcTimeStamp, int maxToRemove, sqlite3* db )
00762     {
00763         if ( maxToRemove < 0 )
00764             return false;
00765 
00766         sqlite3_stmt* purge = 0L;
00767         
00768         int rc;
00769         {
00770 #ifdef SPLIT_DB_FILE
00771             {
00772                 std::vector<std::string> deleteFiles;
00773                 sqlite3_stmt* selectPurge = 0L;
00774                 rc = sqlite3_prepare_v2( db, _purgeSelect.c_str(), _purgeSelect.length(), &selectPurge, 0L);
00775                 if ( rc != SQLITE_OK )
00776                 {
00777                     OE_WARN << LC << "Failed to prepare SQL: " << _purgeSelect << "; " << sqlite3_errmsg(db) << std::endl;
00778                     return false;
00779                 }
00780                 sqlite3_bind_int( selectPurge, 2, maxToRemove );
00781                 sqlite3_bind_int( selectPurge, 1, utcTimeStamp );
00782 
00783                 rc = sqlite3_step( selectPurge );
00784                 if ( rc != SQLITE_ROW && rc != SQLITE_DONE)
00785                 {
00786                     OE_WARN << LC << "SQL QUERY failed for " << _purgeSelect << ": " 
00787                             << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries)
00788                             << ", rc = " << rc << std::endl;
00789                     sqlite3_finalize( selectPurge );
00790                     return false;
00791                 }
00792                 while (rc == SQLITE_ROW) {
00793                     std::string f((const char*)sqlite3_column_text( selectPurge, 0 ));
00794                     deleteFiles.push_back(f);
00795                     rc = sqlite3_step( selectPurge );
00796                 }
00797                 if (rc != SQLITE_DONE) {
00798                     OE_WARN << LC << "SQL QUERY failed for " << _purgeSelect << ": " 
00799                             << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries)
00800                             << ", rc = " << rc << std::endl;
00801                     sqlite3_finalize( selectPurge );
00802                     return false;
00803                 }
00804                 sqlite3_finalize( selectPurge );
00805                 while (!deleteFiles.empty()) {
00806                     std::string fname = _meta._layerName + "_" + deleteFiles.back() +".osgb";
00807                     int run = unlink(fname.c_str());
00808                     if (run) {
00809                         OE_WARN << "Error while removing file " << fname << std::endl;
00810                     }
00811                     deleteFiles.pop_back();
00812                 }
00813             }
00814 #endif
00815             rc = sqlite3_prepare_v2( db, _purgeLimitSQL.c_str(), _purgeLimitSQL.length(), &purge, 0L );
00816             if ( rc != SQLITE_OK )
00817             {
00818                 OE_WARN << LC << "Failed to prepare SQL: " << _purgeLimitSQL << "; " << sqlite3_errmsg(db) << std::endl;
00819                 return false;
00820             }
00821             sqlite3_bind_int( purge, 2, maxToRemove );
00822         }
00823 
00824         sqlite3_bind_int( purge, 1, utcTimeStamp );
00825 
00826         rc = sqlite3_step( purge );
00827         if ( rc != SQLITE_DONE )
00828         {
00829             // cache miss
00830             OE_DEBUG << LC << "Error purging records from \"" << _meta._layerName << "\"; " << sqlite3_errmsg(db) << std::endl;
00831             sqlite3_finalize(purge);
00832             return false;
00833         }
00834 
00835         sqlite3_finalize(purge);
00836         _statsDeleted += maxToRemove;
00837         return true;
00838     }
00839 
00843     bool initialize( sqlite3* db )
00844     {
00845         // first create the table if it does not already exist:
00846         std::stringstream buf;
00847         buf << "CREATE TABLE IF NOT EXISTS \"" << _tableName << "\" ("
00848             << "key char(64) PRIMARY KEY UNIQUE, "
00849             << "created int, "
00850             << "accessed int, "
00851 #ifdef SPLIT_DB_FILE
00852             << "size int )";
00853 #else
00854             << "data blob )";
00855 #endif
00856         std::string sql = buf.str();
00857 
00858         OE_DEBUG << LC << "SQL = " << sql << std::endl;
00859 
00860         char* errMsg = 0L;
00861         int rc = sqlite3_exec( db, sql.c_str(), 0L, 0L, &errMsg );
00862         if ( rc != SQLITE_OK )
00863         {
00864             OE_WARN << LC << "Creating layer \"" << _meta._layerName << "\": " << errMsg << std::endl;
00865             sqlite3_free( errMsg );
00866             return false;
00867         }
00868 
00869         // create an index on the time-last-accessed column
00870         buf.str("");
00871         buf << "CREATE INDEX IF NOT EXISTS \"" 
00872             << _tableName << "_lruindex\" "
00873             << "ON \"" << _tableName << "\" (accessed)";
00874         sql = buf.str();
00875 
00876         OE_DEBUG << LC << "SQL = " << sql << std::endl;
00877 
00878         rc = sqlite3_exec( db, sql.c_str(), 0L, 0L, &errMsg );
00879         if ( rc != SQLITE_OK )
00880         {
00881             OE_WARN << LC << "Creating index for layer \"" << _meta._layerName << "\": " << errMsg << std::endl;
00882             sqlite3_free( errMsg );
00883             //return false;
00884         }
00885 
00886         // next load the appropriate ReaderWriter:
00887 
00888 #if OSG_MIN_VERSION_REQUIRED(2,9,5)
00889         _rw = osgDB::Registry::instance()->getReaderWriterForMimeType( _meta._format );
00890         if ( !_rw.valid() )
00891 #endif
00892             _rw = osgDB::Registry::instance()->getReaderWriterForExtension( _meta._format );
00893         if ( !_rw.valid() )
00894         {
00895             OE_WARN << LC << "Creating layer: Cannot initialize ReaderWriter for format \"" 
00896                 << _meta._format << "\"" << std::endl;
00897             return false;
00898         }
00899 
00900         if ( !_meta._compressor.empty() )
00901             _rwOptions = new osgDB::ReaderWriter::Options( "Compressor=" + _meta._compressor );
00902 
00903         _statsLastCheck = _statsStartTimer = osg::Timer::instance()->tick();
00904         return true;
00905     }
00906 
00907     std::string _selectSQL;
00908     std::string _insertSQL;
00909     std::string _updateTimeSQL;
00910     std::string _updateTimePoolSQL;
00911  
00912     std::string _purgeSelect;
00913     std::string _purgeSQL;
00914     std::string _purgeLimitSQL;
00915     MetadataRecord _meta;
00916     std::string _tableName;
00917 
00918     osg::ref_ptr<osgDB::ReaderWriter> _rw;
00919     osg::ref_ptr<osgDB::ReaderWriter::Options> _rwOptions;
00920 
00921     osg::Timer_t _statsStartTimer;
00922     osg::Timer_t _statsLastCheck;
00923 
00924     int _statsLoaded;
00925     int _statsStored;
00926     int _statsDeleted;
00927 
00928 };
00929 
00930 // --------------------------------------------------------------------------
00931 
00932 typedef std::map<std::string,osg::ref_ptr<LayerTable> > LayerTablesByName;
00933 
00934 // --------------------------------------------------------------------------
00935 
00936 //TODO: might want to move this up out of this plugin at some point.
00937 
00938 struct AsyncPurge : public TaskRequest {
00939     AsyncPurge( const std::string& layerName, int olderThanUTC, Cache* cache )
00940         : _layerName(layerName), _olderThanUTC(olderThanUTC), _cache(cache) { }
00941 
00942     void operator()( ProgressCallback* progress ) { 
00943         osg::ref_ptr<Cache> cache = _cache.get();
00944         if ( cache.valid() )
00945             cache->purge( _layerName, _olderThanUTC, false );
00946     }
00947 
00948     std::string _layerName;
00949     int _olderThanUTC;
00950     osg::observer_ptr<Cache> _cache;
00951 };
00952 
00953 struct AsyncInsert : public TaskRequest {
00954     AsyncInsert( const TileKey& key, const CacheSpec& spec, const osg::Image* image, AsyncCache* cache )
00955         : _cacheSpec(spec), _key(key), _image(image), _cache(cache) { }
00956 
00957     void operator()( ProgressCallback* progress ) {
00958         osg::ref_ptr<AsyncCache> cache = _cache.get();
00959         if ( cache.valid() )
00960             cache->setImageSync( _key, _cacheSpec, _image.get() );
00961     }
00962 
00963     CacheSpec _cacheSpec;
00964     TileKey _key;
00965     osg::ref_ptr<const osg::Image> _image;
00966     osg::observer_ptr<AsyncCache> _cache;
00967 };
00968 
00969 class Sqlite3Cache;
00970 struct AsyncUpdateAccessTime : public TaskRequest
00971 {
00972     AsyncUpdateAccessTime( const TileKey& key, const std::string& cacheId, int timeStamp, Sqlite3Cache* cache );
00973     void operator()( ProgressCallback* progress );
00974 
00975     TileKey _key;
00976     std::string _cacheId;
00977     int _timeStamp;
00978     osg::observer_ptr<Sqlite3Cache> _cache;
00979 };
00980 
00981 
00982 
00983 struct AsyncUpdateAccessTimePool : public TaskRequest
00984 {
00985     AsyncUpdateAccessTimePool( const std::string& cacheId, Sqlite3Cache* cache );
00986     void addEntry(const TileKey& key, int timeStamp);
00987     void addEntryInternal(const TileKey& key);
00988 
00989     void operator()( ProgressCallback* progress );
00990     const std::string& getCacheId() { return _cacheId; }
00991     int getNbEntry() const { return _keys.size(); }
00992     std::map<std::string, int> _keys;
00993     std::string _cacheId;
00994     std::string _keyStr;
00995     int _timeStamp;
00996     osg::observer_ptr<Sqlite3Cache> _cache;
00997 };
00998 
00999 
01000 
01001 // --------------------------------------------------------------------------
01002 
01003 struct ThreadTable {
01004     ThreadTable(LayerTable* table, sqlite3* db) : _table(table), _db(db) { }
01005     LayerTable* _table;
01006     sqlite3* _db;
01007 };
01008 
01009 class Sqlite3Cache : public AsyncCache
01010 {
01011 public:
01012     Sqlite3Cache( const CacheOptions& options ) 
01013       : AsyncCache(options), _options(options),  _db(0L)
01014     {                
01015         if ( _options.path().get().empty() || options.getReferenceURI().empty() )
01016             _databasePath = _options.path().get();
01017         else
01018         {
01019            _databasePath = osgEarth::getFullPath( options.getReferenceURI(), _options.path().get() );
01020         }
01021 
01022         
01023         setName( "sqlite3" );
01024 
01025         _nbRequest = 0;
01026 
01027         //_settings = dynamic_cast<const Sqlite3CacheOptions*>( options );
01028         //if ( !_settings.valid() )
01029         //    _settings = new Sqlite3CacheOptions( options );
01030 
01031         OE_INFO << LC << "options: " << _options.getConfig().toString() << std::endl;
01032 
01033         if ( sqlite3_threadsafe() == 0 )
01034         {
01035             OE_WARN << LC << "SQLITE3 IS NOT COMPILED IN THREAD-SAFE MODE" << std::endl;
01036             // TODO: something in this unlikely condition
01037         }
01038 
01039         // enabled shared cache mode.
01040         //sqlite3_enable_shared_cache( 1 );
01041 
01042 #ifdef USE_L2_CACHE
01043         _L2cache = new MemCache();
01044         _L2cache->setMaxNumTilesInCache( 64 );
01045         OE_INFO << LC << "Using L2 memory cache" << std::endl;
01046 #endif
01047         
01048         _db = openDatabase( _databasePath, _options.serialized().value() );
01049 
01050         if ( _db )
01051         {
01052             if ( ! _metadata.initialize( _db ) )
01053                 _db = 0L;
01054         }
01055 
01056         if ( _db && _options.asyncWrites() == true )
01057         {
01058             _writeService = new osgEarth::TaskService( "Sqlite3Cache Write Service", 1 );
01059         }
01060 
01061         
01062         if (!_metadata.loadAllLayers( _db, _layersList )) {
01063             OE_WARN << "can't read layers in meta data" << std::endl;
01064         }
01065 
01066     }
01067 
01068     // just here to satisfy the osg::Object requirements
01069     Sqlite3Cache() { }
01070     Sqlite3Cache( const Sqlite3Cache& rhs, const osg::CopyOp& op ) { }
01071     META_Object(osgEarth,Sqlite3Cache);
01072 
01073 public: // Cache interface
01074     
01078     bool isCached( const TileKey& key, const CacheSpec& spec ) const
01079     {
01080         // this looks ineffecient, but usually when isCached() is called, getImage() will be
01081         // called soon thereafter. And this call will load it into the L2 cache so the subsequent
01082         // getImage call will not hit the DB again.
01083         osg::ref_ptr<const osg::Image> temp;
01084         return const_cast<Sqlite3Cache*>(this)->getImage( key, spec, temp );
01085     }
01086 
01090     virtual void storeProperties( const CacheSpec& spec, const Profile* profile, unsigned int tileSize ) 
01091     {
01092         if ( !_db ) return;
01093 
01094         if ( spec.cacheId().empty() || profile == 0L || spec.format().empty() )
01095         {
01096             OE_WARN << "ILLEGAL: cannot cache a layer without a layer id" << std::endl;
01097             return;
01098         }
01099 
01100         ScopedLock<Mutex> lock( _tableListMutex ); // b/c we're using the base db handle
01101 #ifdef SPLIT_LAYER_DB
01102         sqlite3* db = getOrCreateMetaDbForThread();
01103 #else
01104         sqlite3* db = getOrCreateDbForThread();
01105 #endif
01106         if ( !db )
01107             return;
01108 
01109         //OE_INFO << "Storing metadata for layer \"" << layerName << "\"" << std::endl;
01110 
01111         MetadataRecord rec;
01112         rec._layerName = spec.cacheId();
01113         rec._profile = profile;
01114         rec._tileSize = tileSize;
01115 
01116 #ifdef USE_SERIALIZERS
01117         rec._format = "osgb";
01118         rec._compressor = "zlib";
01119 #else
01120         rec._format = spec.format();
01121 #endif
01122 
01123         _metadata.store( rec, db );
01124     }
01125 
01129     virtual bool loadProperties( 
01130         const std::string&           cacheId, 
01131         CacheSpec&                   out_spec, 
01132         osg::ref_ptr<const Profile>& out_profile,
01133         unsigned int&                out_tileSize ) 
01134     {
01135         if ( !_db ) return 0L;
01136 
01137         ScopedLock<Mutex> lock( _tableListMutex ); // b/c we're using the base db handle
01138 
01139 #ifdef SPLIT_LAYER_DB
01140         sqlite3* db = getOrCreateMetaDbForThread();
01141 #else
01142         sqlite3* db = getOrCreateDbForThread();
01143 #endif
01144         if ( !db )
01145             return 0L;
01146 
01147         OE_DEBUG << LC << "Loading metadata for layer \"" << cacheId << "\"" << std::endl;
01148 
01149         MetadataRecord rec;
01150         if ( _metadata.load( cacheId, db, rec ) )
01151         {
01152             out_spec = CacheSpec( rec._layerName, rec._format );
01153             out_tileSize = rec._tileSize;
01154             out_profile = rec._profile;
01155         }
01156         return 0L;
01157     }
01158 
01162     bool getImage( const TileKey& key, const CacheSpec& spec, osg::ref_ptr<const osg::Image>& out_image )
01163     {
01164         if ( !_db ) return false;
01165 
01166         // wait if we are purging the db
01167         ScopedLock<Mutex> lock2( _pendingPurgeMutex );
01168 
01169         // first try the L2 cache.
01170         if ( _L2cache.valid() )
01171         {
01172             if ( _L2cache->getImage( key, spec, out_image ) )
01173                 return true;
01174         }
01175 
01176         // next check the deferred-write queue.
01177         if ( _options.asyncWrites() == true )
01178         {
01179 #ifdef INSERT_POOL
01180             ScopedLock<Mutex> lock( _pendingWritesMutex );
01181             std::string name = layerName;
01182             std::map<std::string, osg::ref_ptr<AsyncInsertPool> >::iterator it = _pendingWrites.find(name);
01183             if (it != _pendingWrites.end()) {
01184                 AsyncInsertPool* p = it->second.get();
01185                 if (p) {
01186                     osg::Image* img = p->findImage(key.str());
01187                     if (img) {
01188                         // todo: update the access time, or let it slide?
01189                         OE_DEBUG << LC << "Got key that is write-queued: " << key.str() << std::endl;
01190                         return img;
01191                     }
01192                 }
01193             }
01194 #else
01195             ScopedLock<Mutex> lock( _pendingWritesMutex );
01196             std::string name = key.str() + spec.cacheId(); //layerName;
01197             std::map<std::string,osg::ref_ptr<AsyncInsert> >::iterator i = _pendingWrites.find(name);
01198             if ( i != _pendingWrites.end() )
01199             {
01200                 // todo: update the access time, or let it slide?
01201                 OE_DEBUG << LC << "Got key that is write-queued: " << key.str() << std::endl;
01202                 out_image = i->second->_image.get();
01203                 return out_image.valid();
01204                 //return i->second->_image.get();
01205             }
01206 #endif
01207         }
01208 
01209         // finally, try to query the database.
01210         ThreadTable tt = getTable( spec.cacheId() ); //layerName);
01211         if ( tt._table )
01212         {
01213             ImageRecord rec( key );
01214             if (!tt._table->load( key, rec, tt._db ))
01215                 return false;
01216 
01217             // load it into the L2 cache
01218             out_image = rec._image.release();
01219 
01220             if ( out_image.valid() && _L2cache.valid() )
01221                 _L2cache->setImage( key, spec, out_image.get() );
01222 
01223 #ifdef UPDATE_ACCESS_TIMES
01224 
01225 #ifdef UPDATE_ACCESS_TIMES_POOL
01226             // update the last-access time
01227             int t = (int)::time(0L);
01228             {
01229                 ScopedLock<Mutex> lock( _pendingUpdateMutex );
01230                 osg::ref_ptr<AsyncUpdateAccessTimePool> pool;
01231                 std::map<std::string,osg::ref_ptr<AsyncUpdateAccessTimePool> >::iterator i = _pendingUpdates.find( spec.cacheId() ); //layerName);
01232                 if ( i != _pendingUpdates.end() )
01233                 {
01234                     i->second->addEntry(key, t);
01235                     pool = i->second;
01236                     OE_DEBUG << LC << "Add key " << key.str() << " to existing layer batch " << spec.name() << std::endl;
01237                 } else {
01238                     pool = new AsyncUpdateAccessTimePool(spec.cacheId(), this);
01239                     pool->addEntry(key, t);
01240                     _pendingUpdates[spec.cacheId()] = pool.get();
01241                     _writeService->add(pool.get());
01242                 }
01243             }
01244 #else
01245             // update the last-access time
01246             int t = (int)::time(0L);
01247             _writeService->add( new AsyncUpdateAccessTime(  key, layerName, t, this ) );
01248 #endif
01249 
01250 #endif // UPDATE_ACCESS_TIMES
01251 
01252             return out_image.valid();
01253         }
01254         else
01255         {
01256             OE_DEBUG << LC << "What, no layer table?" << std::endl;
01257         }
01258         return false;
01259     }
01260 
01264     void setImage( const TileKey& key, const CacheSpec& spec, const osg::Image* image )
01265     {        
01266         if ( !_db ) return;
01267 
01268         if ( _options.asyncWrites() == true )
01269         {
01270             // the "pending writes" table is here so that we don't try to write data to
01271             // the cache more than once when using an asynchronous write service.
01272             ScopedLock<Mutex> lock( _pendingWritesMutex );
01273 #ifdef INSERT_POOL
01274             std::string name = layerName;
01275             std::map<std::string, osg::ref_ptr<AsyncInsertPool> >::iterator it = _pendingWrites.find(name);
01276             if ( it == _pendingWrites.end() )
01277             {
01278                 AsyncInsertPool* req = new AsyncInsertPool(layerName, this);
01279                 req->addEntry(key, format, image);
01280                 _pendingWrites[name] = req;
01281                 _writeService->add( req );
01282             }
01283             else
01284             {
01285                 it->second->addEntry(key, format, image);
01286             }
01287 #else
01288             std::string name = key.str() + spec.cacheId();
01289             if ( _pendingWrites.find(name) == _pendingWrites.end() )
01290             {
01291                 AsyncInsert* req = new AsyncInsert(key, spec, image, this);
01292                 _pendingWrites[name] = req;
01293                 _writeService->add( req );
01294             }
01295             else
01296             {
01297                 //NOTE: this should probably never happen.
01298                 OE_WARN << LC << "Tried to setImage; already in queue: " << key.str() << std::endl;
01299             }
01300 #endif
01301         }
01302         else
01303         {
01304 
01305             setImageSync( key, spec, image );
01306         }
01307     }
01308 
01312     bool purge( const std::string& layerName, int olderThanUTC, bool async )
01313     {
01314         if ( !_db ) return false;
01315 
01316         // purge the L2 cache first:
01317         if ( async == true && _options.asyncWrites() == true )
01318         {
01319 #ifdef PURGE_GENERAL
01320             if (!_pendingPurges.empty())
01321                 return false;
01322             ScopedLock<Mutex> lock( _pendingPurgeMutex );
01323             AsyncPurge* req = new AsyncPurge(layerName, olderThanUTC, this);
01324             _writeService->add( req);
01325             _pendingPurges[layerName] = req;
01326 #else
01327             if (_pendingPurges.find(layerName) != _pendingPurges.end()) {
01328                 return false;
01329             } else {
01330                 ScopedLock<Mutex> lock( _pendingPurgeMutex );
01331                 AsyncPurge* req = new AsyncPurge(layerName, olderThanUTC, this);
01332                 _writeService->add( req);
01333                 _pendingPurges[layerName] = req;
01334             }
01335 #endif
01336         }
01337         else
01338         {
01339 #ifdef PURGE_GENERAL
01340             ScopedLock<Mutex> lock( _pendingPurgeMutex );
01341 
01342             sqlite3_int64 limit = _options.maxSize().value() * 1024 * 1024;
01343             std::map<std::string, std::pair<sqlite3_int64,int> > layers;
01344             sqlite3_int64 totalSize = 0;
01345             for (unsigned int i = 0; i < _layersList.size(); ++i) {
01346                 ThreadTable tt = getTable( _layersList[i] );
01347                 if ( tt._table ) {
01348                     sqlite3_int64 size = tt._table->getTableSize(tt._db);
01349                     layers[_layersList[i] ].first = size;
01350                     layers[_layersList[i] ].second = tt._table->getNbEntry(tt._db);
01351                     totalSize += size;
01352                 }
01353             }
01354             OE_INFO << LC << "SQlite cache size " << totalSize/(1024*1024) << " MB" << std::endl;
01355             if (totalSize > 1.2 * limit) {
01356                 sqlite3_int64 diff = totalSize - limit;
01357                 for (unsigned int i = 0; i < _layersList.size(); ++i) {
01358                     float ratio = layers[_layersList[i] ].first * 1.0 / (float)(totalSize);
01359                     int sizeToRemove = (int)floor(ratio * diff);
01360                     if (sizeToRemove > 0) {
01361                         if (sizeToRemove / 1024 > 1024) {
01362                             OE_DEBUG << "Try to remove " << sizeToRemove/(1024*1024) << " MB in " << _layersList[i] << std::endl;
01363                         } else {
01364                             OE_DEBUG << "Try to remove " << sizeToRemove/1024 << " KB in " << _layersList[i] << std::endl;
01365                         }
01366 
01367                         if ( _L2cache.valid() )
01368                             _L2cache->purge( _layersList[i], olderThanUTC, async );
01369                         ThreadTable tt = getTable(_layersList[i]);
01370                         if ( tt._table ) {
01371                             float averageSizePerElement = layers[_layersList[i] ].first * 1.0 /layers[_layersList[i] ].second;
01372                             int nb = (int)floor(sizeToRemove / averageSizePerElement);
01373                             if (nb ) {
01374                                 OE_DEBUG << "remove " << nb << " / " << layers[_layersList[i] ].second << " elements in " << _layersList[i] << std::endl;
01375                                 tt._table->purge(olderThanUTC, nb, tt._db);
01376                             }
01377                         }
01378                     }
01379                 }
01380             }
01381             _pendingPurges.clear();
01382             displayPendingOperations();
01383 
01384 #else
01385             ScopedLock<Mutex> lock( _pendingPurgeMutex );
01386             if ( _L2cache.valid() )
01387                 _L2cache->purge( layerName, olderThanUTC, async );
01388 
01389             ThreadTable tt = getTable( layerName );
01390             if ( tt._table )
01391             {
01392                 _pendingPurges.erase( layerName );
01393 
01394                 unsigned int maxsize = _options.getSize(layerName);
01395                 tt._table->checkAndPurgeIfNeeded(tt._db, maxsize * 1024 * 1024);
01396                 displayPendingOperations();
01397             }
01398 #endif
01399         }
01400         return true;
01401     }
01402 
01406     bool updateAccessTimeSync( const std::string& layerName, const TileKey& key, int newTimestamp )
01407     {
01408         if ( !_db ) return false;
01409 
01410         ThreadTable tt = getTable(layerName);
01411         if ( tt._table )
01412         {
01413             tt._table->updateAccessTime( key, newTimestamp, tt._db );
01414         }
01415         return true;
01416     }
01417 
01421     bool updateAccessTimeSyncPool( const std::string& layerName, const std::string& keys, int newTimestamp )
01422     {
01423         if ( !_db ) return false;
01424 
01425         ThreadTable tt = getTable(layerName);
01426         if ( tt._table )
01427         {
01428             tt._table->updateAccessTimePool( keys, newTimestamp, tt._db );
01429         }
01430 
01431         {
01432             ScopedLock<Mutex> lock( _pendingUpdateMutex );
01433             _pendingUpdates.erase( layerName );
01434             displayPendingOperations();
01435         }
01436         return true;
01437     }
01438 
01439 #ifdef INSERT_POOL
01440     void setImageSyncPool( AsyncInsertPool* pool, const std::string& layerName)
01441     {
01442         ScopedLock<Mutex> lock( _pendingWritesMutex );
01443         const AsyncInsertPool::PoolContainer& entries = pool->_pool;
01444         OE_WARN << "write " << entries.size() << std::endl;
01445         ThreadTable tt = getTable(layerName);
01446         if ( tt._table )
01447         {
01448             sqlite3_stmt* insert;
01449             if (!tt._table->beginStore( tt._db, insert )) {
01450                 return;
01451             }
01452             if (!tt._table->storePool( tt._db, entries, insert )) {
01453                 return;
01454             }
01455             if (!tt._table->endStore( tt._db, insert )) {
01456                 return;
01457             }
01458         }
01459         _pendingWrites.erase( layerName );
01460         displayPendingOperations();
01461     }
01462 #endif
01463 
01464 private:
01465 
01466     void displayPendingOperations() {
01467         if (_pendingWrites.size())
01468             OE_DEBUG<< LC << "pending insert " << _pendingWrites.size() << std::endl;
01469         if (_pendingUpdates.size())
01470             OE_DEBUG << LC << "pending update " << _pendingUpdates.size() << std::endl;
01471         if (_pendingPurges.size())
01472             OE_DEBUG << LC << "pending purge " << _pendingPurges.size() << std::endl;
01473         //OE_INFO << LC << "Pending writes: " << std::dec << _writeService->getNumRequests() << std::endl;
01474     }
01475 
01476     void setImageSync( const TileKey& key, const CacheSpec& spec, const osg::Image* image )
01477     {
01478         if (_options.maxSize().value() > 0 && _nbRequest > MAX_REQUEST_TO_RUN_PURGE) {
01479             int t = (int)::time(0L);
01480             purge(spec.cacheId(), t, _options.asyncWrites().value() );
01481             _nbRequest = 0;
01482         }
01483         _nbRequest++;
01484 
01485         ThreadTable tt = getTable( spec.cacheId() );
01486         if ( tt._table )
01487         {
01488             ::time_t t = ::time(0L);
01489             ImageRecord rec( key );
01490             rec._created = (int)t;
01491             rec._accessed = (int)t;
01492             rec._image = image;
01493 
01494             tt._table->store( rec, tt._db );
01495         }
01496 
01497         if ( _options.asyncWrites() == true )
01498         {
01499             ScopedLock<Mutex> lock( _pendingWritesMutex );
01500             std::string name = key.str() + spec.cacheId();
01501             _pendingWrites.erase( name );
01502             displayPendingOperations();
01503         }
01504     }
01505 
01506 
01507 #ifdef SPLIT_LAYER_DB
01508     sqlite3* getOrCreateDbForThread(const std::string& layer)
01509     {
01510         sqlite3* db = 0L;
01511 
01512         // this method assumes the thread already holds a lock on _tableListMutex, which
01513         // doubles to protect _dbPerThread
01514 
01515         Thread* thread = Thread::CurrentThread();
01516         std::map<Thread*,sqlite3*>::const_iterator k = _dbPerThreadLayers[layer].find(thread);
01517         if ( k == _dbPerThreadLayers[layer].end() )
01518         {
01519             db = openDatabase( layer + _options.path().value(), _options.serialized().value() );
01520             if ( db )
01521             {
01522                 _dbPerThreadLayers[layer][thread] = db;
01523                 OE_INFO << LC << "Created DB handle " << std::hex << db << " for thread " << thread << std::endl;
01524             }
01525             else
01526             {
01527                 OE_WARN << LC << "Failed to open DB on thread " << thread << std::endl;
01528             }
01529         }
01530         else
01531         {
01532             db = k->second;
01533         }
01534 
01535         return db;
01536     }
01537 
01538 
01539     sqlite3* getOrCreateMetaDbForThread()
01540     {
01541         sqlite3* db = 0L;
01542 
01543         // this method assumes the thread already holds a lock on _tableListMutex, which
01544         // doubles to protect _dbPerThread
01545 
01546         Thread* thread = Thread::CurrentThread();
01547         std::map<Thread*,sqlite3*>::const_iterator k = _dbPerThreadMeta.find(thread);
01548         if ( k == _dbPerThreadMeta.end() )
01549         {
01550             db = openDatabase( _options.path().value(), _options.serialized().value() );
01551             if ( db )
01552             {
01553                 _dbPerThreadMeta[thread] = db;
01554                 OE_INFO << LC << "Created DB handle " << std::hex << db << " for thread " << thread << std::endl;
01555             }
01556             else
01557             {
01558                 OE_WARN << LC << "Failed to open DB on thread " << thread << std::endl;
01559             }
01560         }
01561         else
01562         {
01563             db = k->second;
01564         }
01565 
01566         return db;
01567     }
01568 
01569 #else
01570     sqlite3* getOrCreateDbForThread()
01571     {
01572         sqlite3* db = 0L;
01573 
01574         // this method assumes the thread already holds a lock on _tableListMutex, which
01575         // doubles to protect _dbPerThread
01576 
01577         Thread* thread = Thread::CurrentThread();
01578         std::map<Thread*,sqlite3*>::const_iterator k = _dbPerThread.find(thread);
01579         if ( k == _dbPerThread.end() )
01580         {
01581             db = openDatabase( _databasePath, _options.serialized().value() );
01582             if ( db )
01583             {
01584                 _dbPerThread[thread] = db;
01585                 OE_DEBUG << LC << "Created DB handle " << std::hex << db << " for thread " << thread << std::endl;
01586             }
01587             else
01588             {
01589                 OE_WARN << LC << "Failed to open DB on thread " << thread << std::endl;
01590             }
01591         }
01592         else
01593         {
01594             db = k->second;
01595         }
01596 
01597         return db;
01598     }
01599 #endif
01600 
01601     // gets the layer table for the specified layer name, creating it if it does
01602     // not already exist...
01603     ThreadTable getTable( const std::string& tableName )
01604     {
01605         ScopedLock<Mutex> lock( _tableListMutex );
01606 
01607 #ifdef SPLIT_LAYER_DB
01608         sqlite3* db = getOrCreateDbForThread(tableName);
01609 #else
01610         sqlite3* db = getOrCreateDbForThread();
01611 #endif
01612         if ( !db )
01613             return ThreadTable( 0L, 0L );
01614 
01615         LayerTablesByName::iterator i = _tables.find(tableName);
01616         if ( i == _tables.end() )
01617         {
01618             MetadataRecord meta;
01619 #ifdef SPLIT_LAYER_DB
01620             sqlite3* metadb = getOrCreateMetaDbForThread();
01621             if ( !_metadata.load( tableName, metadb, meta ) )
01622 #else
01623             if ( !_metadata.load( tableName, db, meta ) )
01624 #endif
01625             {
01626                 OE_WARN << LC << "Cannot operate on \"" << tableName << "\" because metadata does not exist."
01627                     << std::endl;
01628                 return ThreadTable( 0L, 0L );
01629             }
01630 
01631             _tables[tableName] = new LayerTable( meta, db );
01632             OE_DEBUG << LC << "New LayerTable for " << tableName << std::endl;
01633         }
01634         return ThreadTable( _tables[tableName].get(), db );
01635     }
01636 
01637 private:
01638 
01639     const Sqlite3CacheOptions _options;
01640     //osg::ref_ptr<const Sqlite3CacheOptions> _settings;
01641     osg::ref_ptr<osgDB::ReaderWriter> _defaultRW;
01642     Mutex             _tableListMutex;
01643     MetadataTable     _metadata;
01644     LayerTablesByName _tables;
01645 
01646     bool _useAsyncWrites;
01647     osg::ref_ptr<TaskService> _writeService;
01648     Mutex _pendingWritesMutex;
01649 
01650 #ifdef INSERT_POOL
01651     std::map<std::string, osg::ref_ptr<AsyncInsertPool> > _pendingWrites;
01652 #else
01653     std::map<std::string, osg::ref_ptr<AsyncInsert> > _pendingWrites;
01654 #endif
01655     Mutex _pendingUpdateMutex;
01656     std::map<std::string, osg::ref_ptr<AsyncUpdateAccessTimePool> > _pendingUpdates;
01657 
01658     Mutex _pendingPurgeMutex;
01659     std::map<std::string, osg::ref_ptr<AsyncPurge> > _pendingPurges;
01660 
01661     sqlite3* _db;
01662     std::map<Thread*,sqlite3*> _dbPerThread;
01663 
01664     std::map<std::string, std::map<Thread*,sqlite3*> > _dbPerThreadLayers;
01665     std::map<Thread*,sqlite3*> _dbPerThreadMeta;
01666 
01667     osg::ref_ptr<MemCache> _L2cache;
01668 
01669     int _count;
01670     int _nbRequest;
01671 
01672     std::vector<std::string> _layersList;
01673     std::string _databasePath;
01674 };
01675 
01676 
01677 
01678 AsyncUpdateAccessTime::AsyncUpdateAccessTime( const TileKey& key, const std::string& cacheId, int timeStamp, Sqlite3Cache* cache ) : 
01679 _key(key), _cacheId(cacheId), _timeStamp(timeStamp), _cache(cache)
01680 {
01681     //nop
01682 }
01683 
01684 void AsyncUpdateAccessTime::operator()( ProgressCallback* progress ) 
01685 { 
01686     osg::ref_ptr<Sqlite3Cache> cache = _cache.get();
01687     if ( cache.valid() ) {
01688         //OE_WARN << "AsyncUpdateAccessTime will process " << _key << std::endl;
01689         cache->updateAccessTimeSync( _cacheId, _key , _timeStamp );
01690     }
01691 }
01692 
01693 
01694 AsyncUpdateAccessTimePool::AsyncUpdateAccessTimePool(const std::string& cacheId, Sqlite3Cache* cache) :
01695 _cacheId(cacheId), _cache(cache)
01696 {
01697     //nop
01698 }
01699 
01700 void AsyncUpdateAccessTimePool::addEntry(const TileKey& key, int timeStamp)
01701 {
01702     unsigned int lod = key.getLevelOfDetail();
01703     addEntryInternal(key);
01704     if (lod > 0) {
01705         TileKey previous = key;
01706         for (int i = (int)lod-1; i >= 0; --i) {
01707             TileKey ancestor = previous.createAncestorKey(i);
01708             if (ancestor.valid())
01709                 addEntryInternal(ancestor);
01710             previous = ancestor;
01711         }
01712     }
01713     _timeStamp = timeStamp;
01714 }
01715 
01716 void AsyncUpdateAccessTimePool::addEntryInternal(const TileKey& key)
01717 {
01718     const std::string& keyStr = key.str();
01719     if (_keys.find(keyStr) == _keys.end()) {
01720         _keys[keyStr] = 1;
01721         if (_keyStr.empty())
01722             _keyStr = keyStr;
01723         else
01724             _keyStr += ", " + keyStr;
01725     }
01726 }
01727 
01728 void AsyncUpdateAccessTimePool::operator()( ProgressCallback* progress ) 
01729 { 
01730     osg::ref_ptr<Sqlite3Cache> cache = _cache.get();
01731     if ( cache.valid() ) {
01732         //OE_INFO << "AsyncUpdateAccessTimePool will process " << _keys.size() << std::endl;
01733         cache->updateAccessTimeSyncPool( _cacheId, _keyStr , _timeStamp );
01734     }
01735 }
01736 
01737 
01738 #ifdef INSERT_POOL
01739 AsyncInsertPool::AsyncInsertPool(const std::string& layerName, Sqlite3Cache* cache ) : _layerName(layerName), _cache(cache) { }
01740 void AsyncInsertPool::operator()( ProgressCallback* progress )
01741 {
01742     osg::ref_ptr<Sqlite3Cache> cache = _cache.get();
01743     if ( cache.valid() ) {
01744         cache->setImageSyncPool( this, _layerName );
01745     }
01746 }
01747 #endif
01748 
01749 //------------------------------------------------------------------------
01750 
01758 class Sqlite3CacheFactory : public CacheDriver
01759 {
01760 public:
01761     Sqlite3CacheFactory()
01762     {
01763         supportsExtension( "osgearth_cache_sqlite3", "Sqlite3 Cache for osgEarth" );
01764     }
01765 
01766     virtual const char* className()
01767     {
01768         return "Sqlite3 Cache for osgEarth";
01769     }
01770 
01771     virtual ReadResult readObject(const std::string& file_name, const Options* options) const
01772     {
01773         if ( !acceptsExtension(osgDB::getLowerCaseFileExtension( file_name )))
01774             return ReadResult::FILE_NOT_HANDLED;
01775 
01776         return ReadResult( new Sqlite3Cache( getCacheOptions(options) ) );
01777     }
01778 };
01779 
01780 REGISTER_OSGPLUGIN(osgearth_cache_sqlite3, Sqlite3CacheFactory)
01781 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines