osgEarth 2.1.1
|
00001 /* -*-c++-*- */ 00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph 00003 * Copyright 2008-2010 Pelican Mapping 00004 * http://osgearth.org 00005 * 00006 * osgEarth is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU Lesser General Public License as published by 00008 * the Free Software Foundation; either version 2 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU Lesser General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU Lesser General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/> 00018 */ 00019 #include "Sqlite3CacheOptions" 00020 00021 #include <osgEarth/FileUtils> 00022 #include <osgEarth/TaskService> 00023 #include <osgDB/FileNameUtils> 00024 #include <osgDB/FileUtils> 00025 #include <osgDB/ReaderWriter> 00026 #if OSG_MIN_VERSION_REQUIRED(2,9,5) 00027 # include <osgDB/Options> 00028 #endif 00029 #include <OpenThreads/Mutex> 00030 #include <OpenThreads/ScopedLock> 00031 #include <cstring> 00032 #include <fstream> 00033 00034 // for the compressor stuff 00035 #if OSG_MIN_VERSION_REQUIRED(2,9,8) 00036 # define USE_SERIALIZERS 00037 # include <osgDB/Serializer> 00038 #endif 00039 00040 #include <sqlite3.h> 00041 00042 using namespace osgEarth; 00043 using namespace osgEarth::Drivers; 00044 using namespace OpenThreads; 00045 00046 #define LC "[Sqlite3Cache] " 00047 00048 #define USE_TRANSACTIONS 00049 #define USE_L2_CACHE 00050 00051 //#define SPLIT_DB_FILE 00052 //#define SPLIT_LAYER_DB 00053 #define UPDATE_ACCESS_TIMES 00054 #define UPDATE_ACCESS_TIMES_POOL 00055 #define MAX_REQUEST_TO_RUN_PURGE 100 00056 00057 #define PURGE_GENERAL 00058 //#define INSERT_POOL 00059 00060 //#define MONITOR_THREAD_HEALTH 00061 00062 // -------------------------------------------------------------------------- 00063 00064 // opens a database connection with default settings. 00065 static 00066 sqlite3* openDatabase( const std::string& path, bool serialized ) 00067 { 00068 //Try to create the path if it doesn't exist 00069 std::string dirPath = osgDB::getFilePath(path); 00070 00071 //If the path doesn't currently exist or we can't create the path, don't cache the file 00072 if (!osgDB::fileExists(dirPath) && !osgDB::makeDirectory(dirPath)) 00073 { 00074 OE_WARN << LC << "Couldn't create path " << dirPath << std::endl; 00075 } 00076 00077 sqlite3* db = 0L; 00078 00079 // not sure if SHAREDCACHE is necessary or wise 00080 int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE; 00081 flags |= serialized ? SQLITE_OPEN_FULLMUTEX : SQLITE_OPEN_NOMUTEX; 00082 00083 int rc = sqlite3_open_v2( path.c_str(), &db, flags, 0L ); 00084 00085 if ( rc != 0 ) 00086 { 00087 OE_WARN << LC << "Failed to open cache \"" << path << "\": " << sqlite3_errmsg(db) << std::endl; 00088 return 0L; 00089 } 00090 00091 // make sure that writes actually finish 00092 sqlite3_busy_timeout( db, 60000 ); 00093 00094 return db; 00095 } 00096 00097 // -------------------------------------------------------------------------- 00098 00099 // a slightly customized Cache class that will support asynchronous writes 00100 struct AsyncCache : public Cache 00101 { 00102 public: 00103 AsyncCache(const CacheOptions& options =CacheOptions()): Cache(options) { } 00104 virtual void setImageSync( 00105 const TileKey& key, 00106 const CacheSpec& spec, 00107 const osg::Image* image ) =0; 00108 }; 00109 00110 00111 // -------------------------------------------------------------------------- 00112 00113 struct MetadataRecord 00114 { 00115 std::string _layerName; 00116 std::string _format; 00117 int _tileSize; 00118 osg::ref_ptr<const Profile> _profile; 00119 std::string _compressor; 00120 }; 00121 00125 struct MetadataTable 00126 { 00127 MetadataTable() { } 00128 00129 bool initialize( sqlite3* db ) 00130 { 00131 std::string sql = 00132 "CREATE TABLE IF NOT EXISTS metadata (" 00133 "layer varchar(255) PRIMARY KEY UNIQUE, " 00134 "format varchar(255), " 00135 "compressor varchar(64), " 00136 "tilesize int, " 00137 "srs varchar(1024), " 00138 "xmin double, " 00139 "ymin double, " 00140 "xmax double, " 00141 "ymax double, " 00142 "tw int, " 00143 "th int )"; 00144 00145 OE_DEBUG << LC << "SQL = " << sql << std::endl; 00146 00147 char* errMsg = 0L; 00148 int err = sqlite3_exec( db, sql.c_str(), 0L, 0L, &errMsg ); 00149 if ( err != SQLITE_OK ) 00150 { 00151 OE_WARN << LC << "[Sqlite3Cache] Creating metadata: " << errMsg << std::endl; 00152 sqlite3_free( errMsg ); 00153 return false; 00154 } 00155 00156 // prep the insert/select statement SQL strings: 00157 _insertSQL = 00158 "INSERT OR REPLACE INTO metadata " 00159 "(layer,format,compressor,tilesize,srs,xmin,ymin,xmax,ymax,tw,th) " 00160 "VALUES (?,?,?,?,?,?,?,?,?,?,?)"; 00161 00162 _selectSQL = 00163 "SELECT layer,format,compressor,tilesize,srs,xmin,ymin,xmax,ymax,tw,th " 00164 "FROM metadata WHERE layer = ?"; 00165 00166 return true; 00167 } 00168 00169 bool store( const MetadataRecord& rec, sqlite3* db ) 00170 { 00171 sqlite3_stmt* insert = 0; 00172 int rc = sqlite3_prepare_v2( db, _insertSQL.c_str(), _insertSQL.length(), &insert, 0L ); 00173 if ( rc != SQLITE_OK ) 00174 { 00175 OE_WARN 00176 << LC << "Error preparing SQL: " 00177 << sqlite3_errmsg( db ) 00178 << "(SQL: " << _insertSQL << ")" 00179 << std::endl; 00180 return false; 00181 } 00182 00183 sqlite3_bind_text( insert, 1, rec._layerName.c_str(), -1, 0L ); 00184 sqlite3_bind_text( insert, 2, rec._format.c_str(), -1, 0L ); 00185 sqlite3_bind_text( insert, 3, rec._compressor.c_str(), -1, 0L ); 00186 sqlite3_bind_int ( insert, 4, rec._tileSize ); 00187 sqlite3_bind_text( insert, 5, rec._profile->getSRS()->getInitString().c_str(), -1, 0L ); 00188 sqlite3_bind_double( insert, 6, rec._profile->getExtent().xMin() ); 00189 sqlite3_bind_double( insert, 7, rec._profile->getExtent().yMin() ); 00190 sqlite3_bind_double( insert, 8, rec._profile->getExtent().xMax() ); 00191 sqlite3_bind_double( insert, 9, rec._profile->getExtent().yMax() ); 00192 unsigned int tw, th; 00193 rec._profile->getNumTiles( 0, tw, th ); 00194 sqlite3_bind_int( insert, 10, tw ); 00195 sqlite3_bind_int( insert, 11, th ); 00196 00197 bool success; 00198 00199 rc = sqlite3_step(insert); 00200 if ( rc != SQLITE_DONE ) 00201 { 00202 OE_WARN << LC << "SQL INSERT failed: " << sqlite3_errmsg( db ) 00203 << "; SQL = " << _insertSQL 00204 << std::endl; 00205 success = false; 00206 } 00207 else 00208 { 00209 OE_DEBUG << LC << "Stored metadata record for \"" << rec._layerName << "\"" << std::endl; 00210 success = true; 00211 } 00212 00213 sqlite3_finalize( insert ); 00214 return success; 00215 } 00216 00217 bool load( const std::string& key, sqlite3* db, MetadataRecord& output ) 00218 { 00219 bool success = true; 00220 00221 sqlite3_stmt* select = 0L; 00222 int rc = sqlite3_prepare_v2( db, _selectSQL.c_str(), _selectSQL.length(), &select, 0L ); 00223 if ( rc != SQLITE_OK ) 00224 { 00225 OE_WARN 00226 << LC << "Error preparing SQL: " 00227 << sqlite3_errmsg( db ) 00228 << "(SQL: " << _insertSQL << ")" 00229 << std::endl; 00230 return false; 00231 } 00232 00233 sqlite3_bind_text( select, 1, key.c_str(), -1, 0L ); 00234 00235 rc = sqlite3_step( select ); 00236 if ( rc == SQLITE_ROW ) 00237 { 00238 // got a result 00239 output._layerName = (char*)sqlite3_column_text( select, 0 ); 00240 output._format = (char*)sqlite3_column_text( select, 1 ); 00241 output._compressor = (char*)sqlite3_column_text( select, 2 ); 00242 output._tileSize = sqlite3_column_int( select, 3 ); 00243 ProfileOptions pconf; 00244 pconf.srsString() = (char*)sqlite3_column_text( select, 4 ); 00245 pconf.bounds() = Bounds( 00246 sqlite3_column_double( select, 5 ), 00247 sqlite3_column_double( select, 6 ), 00248 sqlite3_column_double( select, 7 ), 00249 sqlite3_column_double( select, 8 ) ); 00250 pconf.numTilesWideAtLod0() = sqlite3_column_int( select, 9 ); 00251 pconf.numTilesHighAtLod0() = sqlite3_column_int( select, 10 ); 00252 output._profile = Profile::create( pconf ); 00253 success = true; 00254 } 00255 else 00256 { 00257 // no result 00258 OE_DEBUG << "NO metadata record found for \"" << key << "\"" << std::endl; 00259 success = false; 00260 } 00261 00262 sqlite3_finalize( select ); 00263 return success; 00264 } 00265 00266 00267 bool loadAllLayers( sqlite3* db, std::vector<std::string>& output ) 00268 { 00269 bool success = true; 00270 00271 sqlite3_stmt* select = 0L; 00272 std::string selectLayersSQL = "select layer from \"metadata\""; 00273 int rc = sqlite3_prepare_v2( db, selectLayersSQL.c_str(), selectLayersSQL.length(), &select, 0L ); 00274 if ( rc != SQLITE_OK ) 00275 { 00276 OE_WARN 00277 << LC << "Error preparing SQL: " 00278 << sqlite3_errmsg( db ) 00279 << "(SQL: " << _insertSQL << ")" 00280 << std::endl; 00281 return false; 00282 } 00283 00284 success = true; 00285 rc = sqlite3_step( select ); 00286 while (rc == SQLITE_ROW) { 00287 output.push_back((char*)sqlite3_column_text( select, 0 )); 00288 rc = sqlite3_step( select ); 00289 } 00290 00291 if (rc != SQLITE_DONE) 00292 { 00293 // no result 00294 OE_WARN << "NO layers found in metadata" << std::endl; 00295 success = false; 00296 } 00297 00298 sqlite3_finalize( select ); 00299 return success; 00300 } 00301 00302 std::string _insertSQL; 00303 std::string _selectSQL; 00304 }; 00305 00306 // -------------------------------------------------------------------------- 00307 00308 struct ImageRecord 00309 { 00310 ImageRecord( const TileKey& key ) : _key(key) { } 00311 TileKey _key; 00312 int _created; 00313 int _accessed; 00314 osg::ref_ptr<const osg::Image> _image; 00315 }; 00316 00317 #ifdef INSERT_POOL 00318 class Sqlite3Cache; 00319 struct AsyncInsertPool : public TaskRequest { 00320 struct Entry { 00321 TileKey _key; 00322 osg::ref_ptr<osg::Image> _image; 00323 std::string _format; 00324 Entry() {} 00325 Entry(const TileKey& key, const std::string& format, osg::Image* img) : _key(key), _format(format), _image(img) {} 00326 }; 00327 00328 typedef std::map<std::string, Entry> PoolContainer; 00329 00330 AsyncInsertPool(const std::string& layerName, Sqlite3Cache* cache ); 00331 00332 void addEntry( const TileKey& key, const std::string& format, osg::Image* image) 00333 { 00334 const std::string& keyStr = key.str(); 00335 if (_pool.find(keyStr) != _pool.end()) 00336 return; 00337 _pool[keyStr] = Entry(key, format, image); 00338 } 00339 00340 osg::Image* findImage(const std::string& key) 00341 { 00342 PoolContainer::iterator it = _pool.find(key); 00343 if (it != _pool.end()) { 00344 return it->second._image.get(); 00345 } 00346 return 0; 00347 } 00348 00349 void operator()( ProgressCallback* progress ); 00350 00351 PoolContainer _pool; 00352 std::string _layerName; 00353 osg::observer_ptr<Sqlite3Cache> _cache; 00354 }; 00355 #endif 00356 00361 struct LayerTable : public osg::Referenced 00362 { 00363 LayerTable( const MetadataRecord& meta, sqlite3* db ) 00364 : _meta(meta) 00365 { 00366 _tableName = "layer_" + _meta._layerName; 00367 // create the table and load the processors. 00368 if ( ! initialize( db ) ) 00369 { 00370 return; 00371 } 00372 00373 // initialize the SELECT statement for fetching records 00374 std::stringstream buf; 00375 #ifdef SPLIT_DB_FILE 00376 buf << "SELECT created,accessed,size FROM \"" << tableName << "\" WHERE key = ?"; 00377 #else 00378 buf << "SELECT created,accessed,data FROM \"" << _tableName << "\" WHERE key = ?"; 00379 #endif 00380 _selectSQL = buf.str(); 00381 00382 // initialize the UPDATE statement for updating the timestamp of an accessed record 00383 buf.str(""); 00384 buf << "UPDATE \"" << _tableName << "\" SET accessed = ? " 00385 << "WHERE key = ?"; 00386 _updateTimeSQL = buf.str(); 00387 00388 buf.str(""); 00389 buf << "UPDATE \"" << _tableName << "\" SET accessed = ? " 00390 << "WHERE key in ( ? )"; 00391 _updateTimePoolSQL = buf.str(); 00392 00393 // initialize the INSERT statement for writing records. 00394 buf.str(""); 00395 buf << "INSERT OR REPLACE INTO \"" << _tableName << "\" " 00396 #ifdef SPLIT_DB_FILE 00397 << "(key,created,accessed,size) VALUES (?,?,?,?)"; 00398 #else 00399 << "(key,created,accessed,data) VALUES (?,?,?,?)"; 00400 #endif 00401 _insertSQL = buf.str(); 00402 00403 // initialize the DELETE statements for purging old records. 00404 buf.str(""); 00405 buf << "DELETE FROM \"" << _tableName << "\" " 00406 << "INDEXED BY \"" << _tableName << "_lruindex\" " 00407 << "WHERE accessed < ?"; 00408 _purgeSQL = buf.str(); 00409 00410 buf.str(""); 00411 buf << "DELETE FROM \"" << _tableName << "\" WHERE key in (SELECT key FROM \"" << _tableName << "\" WHERE \"accessed\" < ? limit ?)"; 00412 _purgeLimitSQL = buf.str(); 00413 00414 buf.str(""); 00415 buf << "SELECT key FROM \"" << _tableName << "\" WHERE \"accessed\" < ? limit ?"; 00416 _purgeSelect = buf.str(); 00417 00418 _statsLoaded = 0; 00419 _statsStored = 0; 00420 _statsDeleted = 0; 00421 } 00422 00423 00424 sqlite3_int64 getTableSize(sqlite3* db) 00425 { 00426 #ifdef SPLIT_DB_FILE 00427 std::string query = "select sum(size) from \"" + _tableName + "\";"; 00428 #else 00429 std::string query = "select sum(length(data)) from \"" + _tableName + "\";"; 00430 #endif 00431 sqlite3_stmt* select = 0L; 00432 int rc = sqlite3_prepare_v2( db, query.c_str(), query.length(), &select, 0L ); 00433 if ( rc != SQLITE_OK ) 00434 { 00435 OE_WARN << LC << "Failed to prepare SQL: " << query << "; " << sqlite3_errmsg(db) << std::endl; 00436 return -1; 00437 } 00438 00439 rc = sqlite3_step( select ); 00440 if ( rc != SQLITE_ROW) 00441 { 00442 OE_WARN << LC << "SQL QUERY failed for " << query << ": " 00443 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries) 00444 << ", rc = " << rc << std::endl; 00445 sqlite3_finalize( select ); 00446 return -1; 00447 } 00448 sqlite3_int64 size = sqlite3_column_int(select, 0); 00449 sqlite3_finalize( select ); 00450 return size; 00451 } 00452 00453 int getNbEntry(sqlite3* db) { 00454 std::string query = "select count(*) from \"" + _tableName + "\";"; 00455 sqlite3_stmt* select = 0L; 00456 int rc = sqlite3_prepare_v2( db, query.c_str(), query.length(), &select, 0L ); 00457 if ( rc != SQLITE_OK ) 00458 { 00459 OE_WARN << LC << "Failed to prepare SQL: " << query << "; " << sqlite3_errmsg(db) << std::endl; 00460 return -1; 00461 } 00462 00463 rc = sqlite3_step( select ); 00464 if ( rc != SQLITE_ROW) 00465 { 00466 OE_WARN << LC << "SQL QUERY failed for " << query << ": " 00467 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries) 00468 << ", rc = " << rc << std::endl; 00469 sqlite3_finalize( select ); 00470 return -1; 00471 } 00472 int nbItems = sqlite3_column_int(select, 0); 00473 sqlite3_finalize( select ); 00474 return nbItems; 00475 } 00476 00477 void checkAndPurgeIfNeeded(sqlite3* db, unsigned int maxSize ) 00478 { 00479 int size = getTableSize(db); 00480 OE_DEBUG << _meta._layerName << std::dec << " : " << size/1024/1024 << " MB" << std::endl; 00481 if (size < 0 || size < 1.2 * maxSize) 00482 return; 00483 00484 ::time_t t = ::time(0L); 00485 int nbElements = getNbEntry(db); 00486 float averageSize = size * 1.0 / nbElements; 00487 float diffSize = size - maxSize; 00488 int maxElementToRemove = static_cast<int>(ceil(diffSize/averageSize)); 00489 OE_DEBUG << _meta._layerName << " try to remove " << std::dec << maxElementToRemove << " / " << nbElements << " to save place" << std::endl; 00490 purge(t, maxElementToRemove, db); 00491 } 00492 00493 bool store( const ImageRecord& rec, sqlite3* db ) 00494 { 00495 displayStats(); 00496 00497 sqlite3_stmt* insert = 0L; 00498 int rc = sqlite3_prepare_v2( db, _insertSQL.c_str(), _insertSQL.length(), &insert, 0L ); 00499 if ( rc != SQLITE_OK ) 00500 { 00501 OE_WARN 00502 << LC << "Error preparing SQL: " 00503 << sqlite3_errmsg( db ) 00504 << "(SQL: " << _insertSQL << ")" 00505 << std::endl; 00506 return false; 00507 } 00508 00509 // bind the key string: 00510 std::string keyStr = rec._key.str(); 00511 sqlite3_bind_text( insert, 1, keyStr.c_str(), keyStr.length(), SQLITE_STATIC ); 00512 sqlite3_bind_int( insert, 2, rec._created ); 00513 sqlite3_bind_int( insert, 3, rec._accessed ); 00514 00515 // serialize the image: 00516 #ifdef SPLIT_DB_FILE 00517 std::stringstream outStream; 00518 _rw->writeImage( *rec._image.get(), outStream, _rwOptions.get() ); 00519 std::string outBuf = outStream.str(); 00520 std::string fname = _meta._layerName + "_" + keyStr+".osgb"; 00521 { 00522 std::ofstream file(fname.c_str(), std::ios::out | std::ios::binary); 00523 if (file.is_open()) { 00524 file.write(outBuf.c_str(), outBuf.length()); 00525 } 00526 } 00527 sqlite3_bind_int( insert, 4, outBuf.length() ); 00528 #else 00529 std::stringstream outStream; 00530 _rw->writeImage( *rec._image.get(), outStream, _rwOptions.get() ); 00531 std::string outBuf = outStream.str(); 00532 sqlite3_bind_blob( insert, 4, outBuf.c_str(), outBuf.length(), SQLITE_STATIC ); 00533 #endif 00534 00535 // write to the database: 00536 rc = sqlite3_step( insert ); 00537 00538 if ( rc != SQLITE_DONE ) 00539 { 00540 OE_WARN << LC << "SQL INSERT failed for key " << rec._key.str() << ": " 00541 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries) 00542 << ", rc = " << rc << std::endl; 00543 sqlite3_finalize( insert ); 00544 return false; 00545 } 00546 else 00547 { 00548 OE_DEBUG << LC << "cache INSERT tile " << rec._key.str() << std::endl; 00549 sqlite3_finalize( insert ); 00550 _statsStored++; 00551 return true; 00552 } 00553 } 00554 00555 #ifdef INSERT_POOL 00556 bool beginStore(sqlite3* db, sqlite3_stmt*& insert) 00557 { 00558 int rc; 00559 rc = sqlite3_exec( db, "BEGIN TRANSACTION", NULL, NULL, NULL); 00560 if (rc != 0) { 00561 OE_WARN << LC << "Failed to begin transaction batch of insert for " << _meta._layerName << " " << sqlite3_errmsg(db) << std::endl; 00562 return false; 00563 } 00564 00565 // prepare for multiple inserts 00566 rc = sqlite3_prepare_v2( db, _insertSQL.c_str(), _insertSQL.length(), &insert, 0L ); 00567 if ( rc != SQLITE_OK ) 00568 { 00569 OE_WARN 00570 << LC << "Error preparing SQL: " 00571 << sqlite3_errmsg( db ) 00572 << "(SQL: " << _insertSQL << ")" 00573 << std::endl; 00574 return false; 00575 } 00576 return true; 00577 } 00578 00579 bool storePool(sqlite3* db, const AsyncInsertPool::PoolContainer& entries, sqlite3_stmt* insert) 00580 { 00581 ::time_t t = ::time(0L); 00582 sqlite3_bind_int( insert, 2, t ); 00583 sqlite3_bind_int( insert, 3, t ); 00584 int rc; 00585 for (AsyncInsertPool::PoolContainer::const_iterator it = entries.begin(); it != entries.end(); ++it) { 00586 00587 // bind the key string: 00588 std::string keyStr = it->first; 00589 sqlite3_bind_text( insert, 1, keyStr.c_str(), keyStr.length(), SQLITE_STATIC ); 00590 00591 // serialize the image: 00592 #ifdef SPLIT_DB_FILE 00593 std::stringstream outStream; 00594 _rw->writeImage( * (it)->second._image.get(), outStream, _rwOptions.get() ); 00595 std::string outBuf = outStream.str(); 00596 std::string fname = _meta._layerName + "_" + keyStr+".osgb"; 00597 { 00598 std::ofstream file(fname.c_str(), std::ios::out | std::ios::binary); 00599 if (file.is_open()) { 00600 file.write(outBuf.c_str(), outBuf.length()); 00601 } 00602 } 00603 sqlite3_bind_int( insert, 4, outBuf.length() ); 00604 #else 00605 std::stringstream outStream; 00606 _rw->writeImage( * (it)->second._image.get(), outStream, _rwOptions.get() ); 00607 std::string outBuf = outStream.str(); 00608 sqlite3_bind_blob( insert, 4, outBuf.c_str(), outBuf.length(), SQLITE_STATIC ); 00609 #endif 00610 rc = sqlite3_step(insert); // executes the INSERT 00611 if ( rc != SQLITE_DONE ) 00612 { 00613 OE_WARN << LC << "SQL INSERT failed for key " << keyStr << ": " 00614 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries) 00615 << ", rc = " << rc << std::endl; 00616 sqlite3_finalize(insert); // clean up prepared statement 00617 return false; 00618 } 00619 sqlite3_reset(insert); // reset the statement 00620 } 00621 00622 _statsStored += entries.size(); 00623 return true; 00624 } 00625 00626 bool endStore(sqlite3* db, sqlite3_stmt* insert) 00627 { 00628 int rc = 0; 00629 rc = sqlite3_finalize(insert); // clean up prepared statement 00630 rc = sqlite3_exec( db, "COMMIT", NULL, NULL, NULL); // COMMIT all dirty pages at once 00631 if (rc != 0) { 00632 OE_WARN << LC << "Failed to commit batch of insert for " << _meta._layerName << " " << sqlite3_errmsg(db) << std::endl; 00633 return false; 00634 } 00635 00636 return true; 00637 } 00638 #endif 00639 00640 bool updateAccessTime( const TileKey& key, int newTimestamp, sqlite3* db ) 00641 { 00642 sqlite3_stmt* update = 0L; 00643 int rc = sqlite3_prepare_v2( db, _updateTimeSQL.c_str(), _updateTimeSQL.length(), &update, 0L ); 00644 if ( rc != SQLITE_OK ) 00645 { 00646 OE_WARN << LC << "Failed to prepare SQL " << _updateTimeSQL << "; " << sqlite3_errmsg(db) << std::endl; 00647 return false; 00648 } 00649 00650 bool success = true; 00651 sqlite3_bind_int( update, 1, newTimestamp ); 00652 std::string keyStr = key.str(); 00653 sqlite3_bind_text( update, 2, keyStr.c_str(), keyStr.length(), SQLITE_STATIC ); 00654 rc = sqlite3_step( update ); 00655 if ( rc != SQLITE_DONE ) 00656 { 00657 OE_WARN << LC << "Failed to update timestamp for " << key.str() << " on layer " << _meta._layerName << " rc = " << rc << std::endl; 00658 success = false; 00659 } 00660 00661 sqlite3_finalize( update ); 00662 return success; 00663 } 00664 00665 bool updateAccessTimePool( const std::string& keyStr, int newTimestamp, sqlite3* db ) 00666 { 00667 //OE_WARN << LC << "update access times " << _meta._layerName << " " << keyStr << std::endl; 00668 sqlite3_stmt* update = 0L; 00669 int rc = sqlite3_prepare_v2( db, _updateTimePoolSQL.c_str(), _updateTimePoolSQL.length(), &update, 0L ); 00670 if ( rc != SQLITE_OK ) 00671 { 00672 OE_WARN << LC << "Failed to prepare SQL " << _updateTimePoolSQL << "; " << sqlite3_errmsg(db) << std::endl; 00673 return false; 00674 } 00675 00676 bool success = true; 00677 sqlite3_bind_int( update, 1, newTimestamp ); 00678 sqlite3_bind_text( update, 2, keyStr.c_str(), keyStr.length(), SQLITE_STATIC ); 00679 rc = sqlite3_step( update ); 00680 if ( rc != SQLITE_DONE ) 00681 { 00682 OE_WARN << LC << "Failed to update timestamp for " << keyStr << " on layer " << _meta._layerName << " rc = " << rc << std::endl; 00683 success = false; 00684 } 00685 00686 sqlite3_finalize( update ); 00687 return success; 00688 } 00689 00690 bool load( const TileKey& key, ImageRecord& output, sqlite3* db ) 00691 { 00692 displayStats(); 00693 int imageBufLen = 0; 00694 00695 sqlite3_stmt* select = 0L; 00696 int rc = sqlite3_prepare_v2( db, _selectSQL.c_str(), _selectSQL.length(), &select, 0L ); 00697 if ( rc != SQLITE_OK ) 00698 { 00699 OE_WARN << LC << "Failed to prepare SQL: " << _selectSQL << "; " << sqlite3_errmsg(db) << std::endl; 00700 return false; 00701 } 00702 00703 std::string keyStr = key.str(); 00704 sqlite3_bind_text( select, 1, keyStr.c_str(), keyStr.length(), SQLITE_STATIC ); 00705 00706 rc = sqlite3_step( select ); 00707 if ( rc != SQLITE_ROW ) // == SQLITE_DONE ) // SQLITE_DONE means "no more rows" 00708 { 00709 // cache miss 00710 OE_DEBUG << LC << "Cache MISS on tile " << key.str() << std::endl; 00711 sqlite3_finalize(select); 00712 return false; 00713 } 00714 00715 // copy the timestamps: 00716 output._created = sqlite3_column_int( select, 0 ); 00717 output._accessed = sqlite3_column_int( select, 1 ); 00718 00719 #ifdef SPLIT_DB_FILE 00720 std::string fname(keyStr); 00721 osgDB::ReaderWriter::ReadResult rr = _rw->readImage( _meta._layerName + "_" +fname+".osgb" ); 00722 #else 00723 // the pointer returned from _blob gets freed internally by sqlite, supposedly 00724 const char* data = (const char*)sqlite3_column_blob( select, 2 ); 00725 imageBufLen = sqlite3_column_bytes( select, 2 ); 00726 00727 // deserialize the image from the buffer: 00728 std::string imageString( data, imageBufLen ); 00729 std::stringstream imageBufStream( imageString ); 00730 osgDB::ReaderWriter::ReadResult rr = _rw->readImage( imageBufStream ); 00731 #endif 00732 if ( rr.error() ) 00733 { 00734 OE_WARN << LC << "Failed to read image from database: " << rr.message() << std::endl; 00735 } 00736 else 00737 { 00738 output._image = rr.takeImage(); 00739 output._key = key; 00740 OE_DEBUG << LC << "Cache HIT on tile " << key.str() << std::endl; 00741 } 00742 00743 sqlite3_finalize(select); 00744 00745 _statsLoaded++; 00746 return output._image.valid(); 00747 } 00748 00749 void displayStats() 00750 { 00751 osg::Timer_t t = osg::Timer::instance()->tick(); 00752 if (osg::Timer::instance()->delta_s( _statsLastCheck, t) > 10.0) { 00753 double d = osg::Timer::instance()->delta_s(_statsStartTimer, t); 00754 OE_DEBUG << _meta._layerName << " time " << d << " stored " << std::dec << _statsStored << " rate " << _statsStored * 1.0 / d << std::endl; 00755 OE_DEBUG << _meta._layerName << " time " << d << " loaded " << std::dec << _statsLoaded << " rate " << _statsLoaded * 1.0 / d << std::endl; 00756 OE_DEBUG << _meta._layerName << " time " << d << " deleted " << std::dec << _statsDeleted << " rate " << _statsDeleted * 1.0 / d << std::endl; 00757 _statsLastCheck = t; 00758 } 00759 } 00760 00761 bool purge( int utcTimeStamp, int maxToRemove, sqlite3* db ) 00762 { 00763 if ( maxToRemove < 0 ) 00764 return false; 00765 00766 sqlite3_stmt* purge = 0L; 00767 00768 int rc; 00769 { 00770 #ifdef SPLIT_DB_FILE 00771 { 00772 std::vector<std::string> deleteFiles; 00773 sqlite3_stmt* selectPurge = 0L; 00774 rc = sqlite3_prepare_v2( db, _purgeSelect.c_str(), _purgeSelect.length(), &selectPurge, 0L); 00775 if ( rc != SQLITE_OK ) 00776 { 00777 OE_WARN << LC << "Failed to prepare SQL: " << _purgeSelect << "; " << sqlite3_errmsg(db) << std::endl; 00778 return false; 00779 } 00780 sqlite3_bind_int( selectPurge, 2, maxToRemove ); 00781 sqlite3_bind_int( selectPurge, 1, utcTimeStamp ); 00782 00783 rc = sqlite3_step( selectPurge ); 00784 if ( rc != SQLITE_ROW && rc != SQLITE_DONE) 00785 { 00786 OE_WARN << LC << "SQL QUERY failed for " << _purgeSelect << ": " 00787 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries) 00788 << ", rc = " << rc << std::endl; 00789 sqlite3_finalize( selectPurge ); 00790 return false; 00791 } 00792 while (rc == SQLITE_ROW) { 00793 std::string f((const char*)sqlite3_column_text( selectPurge, 0 )); 00794 deleteFiles.push_back(f); 00795 rc = sqlite3_step( selectPurge ); 00796 } 00797 if (rc != SQLITE_DONE) { 00798 OE_WARN << LC << "SQL QUERY failed for " << _purgeSelect << ": " 00799 << sqlite3_errmsg( db ) //<< "; tries=" << (1000-tries) 00800 << ", rc = " << rc << std::endl; 00801 sqlite3_finalize( selectPurge ); 00802 return false; 00803 } 00804 sqlite3_finalize( selectPurge ); 00805 while (!deleteFiles.empty()) { 00806 std::string fname = _meta._layerName + "_" + deleteFiles.back() +".osgb"; 00807 int run = unlink(fname.c_str()); 00808 if (run) { 00809 OE_WARN << "Error while removing file " << fname << std::endl; 00810 } 00811 deleteFiles.pop_back(); 00812 } 00813 } 00814 #endif 00815 rc = sqlite3_prepare_v2( db, _purgeLimitSQL.c_str(), _purgeLimitSQL.length(), &purge, 0L ); 00816 if ( rc != SQLITE_OK ) 00817 { 00818 OE_WARN << LC << "Failed to prepare SQL: " << _purgeLimitSQL << "; " << sqlite3_errmsg(db) << std::endl; 00819 return false; 00820 } 00821 sqlite3_bind_int( purge, 2, maxToRemove ); 00822 } 00823 00824 sqlite3_bind_int( purge, 1, utcTimeStamp ); 00825 00826 rc = sqlite3_step( purge ); 00827 if ( rc != SQLITE_DONE ) 00828 { 00829 // cache miss 00830 OE_DEBUG << LC << "Error purging records from \"" << _meta._layerName << "\"; " << sqlite3_errmsg(db) << std::endl; 00831 sqlite3_finalize(purge); 00832 return false; 00833 } 00834 00835 sqlite3_finalize(purge); 00836 _statsDeleted += maxToRemove; 00837 return true; 00838 } 00839 00843 bool initialize( sqlite3* db ) 00844 { 00845 // first create the table if it does not already exist: 00846 std::stringstream buf; 00847 buf << "CREATE TABLE IF NOT EXISTS \"" << _tableName << "\" (" 00848 << "key char(64) PRIMARY KEY UNIQUE, " 00849 << "created int, " 00850 << "accessed int, " 00851 #ifdef SPLIT_DB_FILE 00852 << "size int )"; 00853 #else 00854 << "data blob )"; 00855 #endif 00856 std::string sql = buf.str(); 00857 00858 OE_DEBUG << LC << "SQL = " << sql << std::endl; 00859 00860 char* errMsg = 0L; 00861 int rc = sqlite3_exec( db, sql.c_str(), 0L, 0L, &errMsg ); 00862 if ( rc != SQLITE_OK ) 00863 { 00864 OE_WARN << LC << "Creating layer \"" << _meta._layerName << "\": " << errMsg << std::endl; 00865 sqlite3_free( errMsg ); 00866 return false; 00867 } 00868 00869 // create an index on the time-last-accessed column 00870 buf.str(""); 00871 buf << "CREATE INDEX IF NOT EXISTS \"" 00872 << _tableName << "_lruindex\" " 00873 << "ON \"" << _tableName << "\" (accessed)"; 00874 sql = buf.str(); 00875 00876 OE_DEBUG << LC << "SQL = " << sql << std::endl; 00877 00878 rc = sqlite3_exec( db, sql.c_str(), 0L, 0L, &errMsg ); 00879 if ( rc != SQLITE_OK ) 00880 { 00881 OE_WARN << LC << "Creating index for layer \"" << _meta._layerName << "\": " << errMsg << std::endl; 00882 sqlite3_free( errMsg ); 00883 //return false; 00884 } 00885 00886 // next load the appropriate ReaderWriter: 00887 00888 #if OSG_MIN_VERSION_REQUIRED(2,9,5) 00889 _rw = osgDB::Registry::instance()->getReaderWriterForMimeType( _meta._format ); 00890 if ( !_rw.valid() ) 00891 #endif 00892 _rw = osgDB::Registry::instance()->getReaderWriterForExtension( _meta._format ); 00893 if ( !_rw.valid() ) 00894 { 00895 OE_WARN << LC << "Creating layer: Cannot initialize ReaderWriter for format \"" 00896 << _meta._format << "\"" << std::endl; 00897 return false; 00898 } 00899 00900 if ( !_meta._compressor.empty() ) 00901 _rwOptions = new osgDB::ReaderWriter::Options( "Compressor=" + _meta._compressor ); 00902 00903 _statsLastCheck = _statsStartTimer = osg::Timer::instance()->tick(); 00904 return true; 00905 } 00906 00907 std::string _selectSQL; 00908 std::string _insertSQL; 00909 std::string _updateTimeSQL; 00910 std::string _updateTimePoolSQL; 00911 00912 std::string _purgeSelect; 00913 std::string _purgeSQL; 00914 std::string _purgeLimitSQL; 00915 MetadataRecord _meta; 00916 std::string _tableName; 00917 00918 osg::ref_ptr<osgDB::ReaderWriter> _rw; 00919 osg::ref_ptr<osgDB::ReaderWriter::Options> _rwOptions; 00920 00921 osg::Timer_t _statsStartTimer; 00922 osg::Timer_t _statsLastCheck; 00923 00924 int _statsLoaded; 00925 int _statsStored; 00926 int _statsDeleted; 00927 00928 }; 00929 00930 // -------------------------------------------------------------------------- 00931 00932 typedef std::map<std::string,osg::ref_ptr<LayerTable> > LayerTablesByName; 00933 00934 // -------------------------------------------------------------------------- 00935 00936 //TODO: might want to move this up out of this plugin at some point. 00937 00938 struct AsyncPurge : public TaskRequest { 00939 AsyncPurge( const std::string& layerName, int olderThanUTC, Cache* cache ) 00940 : _layerName(layerName), _olderThanUTC(olderThanUTC), _cache(cache) { } 00941 00942 void operator()( ProgressCallback* progress ) { 00943 osg::ref_ptr<Cache> cache = _cache.get(); 00944 if ( cache.valid() ) 00945 cache->purge( _layerName, _olderThanUTC, false ); 00946 } 00947 00948 std::string _layerName; 00949 int _olderThanUTC; 00950 osg::observer_ptr<Cache> _cache; 00951 }; 00952 00953 struct AsyncInsert : public TaskRequest { 00954 AsyncInsert( const TileKey& key, const CacheSpec& spec, const osg::Image* image, AsyncCache* cache ) 00955 : _cacheSpec(spec), _key(key), _image(image), _cache(cache) { } 00956 00957 void operator()( ProgressCallback* progress ) { 00958 osg::ref_ptr<AsyncCache> cache = _cache.get(); 00959 if ( cache.valid() ) 00960 cache->setImageSync( _key, _cacheSpec, _image.get() ); 00961 } 00962 00963 CacheSpec _cacheSpec; 00964 TileKey _key; 00965 osg::ref_ptr<const osg::Image> _image; 00966 osg::observer_ptr<AsyncCache> _cache; 00967 }; 00968 00969 class Sqlite3Cache; 00970 struct AsyncUpdateAccessTime : public TaskRequest 00971 { 00972 AsyncUpdateAccessTime( const TileKey& key, const std::string& cacheId, int timeStamp, Sqlite3Cache* cache ); 00973 void operator()( ProgressCallback* progress ); 00974 00975 TileKey _key; 00976 std::string _cacheId; 00977 int _timeStamp; 00978 osg::observer_ptr<Sqlite3Cache> _cache; 00979 }; 00980 00981 00982 00983 struct AsyncUpdateAccessTimePool : public TaskRequest 00984 { 00985 AsyncUpdateAccessTimePool( const std::string& cacheId, Sqlite3Cache* cache ); 00986 void addEntry(const TileKey& key, int timeStamp); 00987 void addEntryInternal(const TileKey& key); 00988 00989 void operator()( ProgressCallback* progress ); 00990 const std::string& getCacheId() { return _cacheId; } 00991 int getNbEntry() const { return _keys.size(); } 00992 std::map<std::string, int> _keys; 00993 std::string _cacheId; 00994 std::string _keyStr; 00995 int _timeStamp; 00996 osg::observer_ptr<Sqlite3Cache> _cache; 00997 }; 00998 00999 01000 01001 // -------------------------------------------------------------------------- 01002 01003 struct ThreadTable { 01004 ThreadTable(LayerTable* table, sqlite3* db) : _table(table), _db(db) { } 01005 LayerTable* _table; 01006 sqlite3* _db; 01007 }; 01008 01009 class Sqlite3Cache : public AsyncCache 01010 { 01011 public: 01012 Sqlite3Cache( const CacheOptions& options ) 01013 : AsyncCache(options), _options(options), _db(0L) 01014 { 01015 if ( _options.path().get().empty() || options.getReferenceURI().empty() ) 01016 _databasePath = _options.path().get(); 01017 else 01018 { 01019 _databasePath = osgEarth::getFullPath( options.getReferenceURI(), _options.path().get() ); 01020 } 01021 01022 01023 setName( "sqlite3" ); 01024 01025 _nbRequest = 0; 01026 01027 //_settings = dynamic_cast<const Sqlite3CacheOptions*>( options ); 01028 //if ( !_settings.valid() ) 01029 // _settings = new Sqlite3CacheOptions( options ); 01030 01031 OE_INFO << LC << "options: " << _options.getConfig().toString() << std::endl; 01032 01033 if ( sqlite3_threadsafe() == 0 ) 01034 { 01035 OE_WARN << LC << "SQLITE3 IS NOT COMPILED IN THREAD-SAFE MODE" << std::endl; 01036 // TODO: something in this unlikely condition 01037 } 01038 01039 // enabled shared cache mode. 01040 //sqlite3_enable_shared_cache( 1 ); 01041 01042 #ifdef USE_L2_CACHE 01043 _L2cache = new MemCache(); 01044 _L2cache->setMaxNumTilesInCache( 64 ); 01045 OE_INFO << LC << "Using L2 memory cache" << std::endl; 01046 #endif 01047 01048 _db = openDatabase( _databasePath, _options.serialized().value() ); 01049 01050 if ( _db ) 01051 { 01052 if ( ! _metadata.initialize( _db ) ) 01053 _db = 0L; 01054 } 01055 01056 if ( _db && _options.asyncWrites() == true ) 01057 { 01058 _writeService = new osgEarth::TaskService( "Sqlite3Cache Write Service", 1 ); 01059 } 01060 01061 01062 if (!_metadata.loadAllLayers( _db, _layersList )) { 01063 OE_WARN << "can't read layers in meta data" << std::endl; 01064 } 01065 01066 } 01067 01068 // just here to satisfy the osg::Object requirements 01069 Sqlite3Cache() { } 01070 Sqlite3Cache( const Sqlite3Cache& rhs, const osg::CopyOp& op ) { } 01071 META_Object(osgEarth,Sqlite3Cache); 01072 01073 public: // Cache interface 01074 01078 bool isCached( const TileKey& key, const CacheSpec& spec ) const 01079 { 01080 // this looks ineffecient, but usually when isCached() is called, getImage() will be 01081 // called soon thereafter. And this call will load it into the L2 cache so the subsequent 01082 // getImage call will not hit the DB again. 01083 osg::ref_ptr<const osg::Image> temp; 01084 return const_cast<Sqlite3Cache*>(this)->getImage( key, spec, temp ); 01085 } 01086 01090 virtual void storeProperties( const CacheSpec& spec, const Profile* profile, unsigned int tileSize ) 01091 { 01092 if ( !_db ) return; 01093 01094 if ( spec.cacheId().empty() || profile == 0L || spec.format().empty() ) 01095 { 01096 OE_WARN << "ILLEGAL: cannot cache a layer without a layer id" << std::endl; 01097 return; 01098 } 01099 01100 ScopedLock<Mutex> lock( _tableListMutex ); // b/c we're using the base db handle 01101 #ifdef SPLIT_LAYER_DB 01102 sqlite3* db = getOrCreateMetaDbForThread(); 01103 #else 01104 sqlite3* db = getOrCreateDbForThread(); 01105 #endif 01106 if ( !db ) 01107 return; 01108 01109 //OE_INFO << "Storing metadata for layer \"" << layerName << "\"" << std::endl; 01110 01111 MetadataRecord rec; 01112 rec._layerName = spec.cacheId(); 01113 rec._profile = profile; 01114 rec._tileSize = tileSize; 01115 01116 #ifdef USE_SERIALIZERS 01117 rec._format = "osgb"; 01118 rec._compressor = "zlib"; 01119 #else 01120 rec._format = spec.format(); 01121 #endif 01122 01123 _metadata.store( rec, db ); 01124 } 01125 01129 virtual bool loadProperties( 01130 const std::string& cacheId, 01131 CacheSpec& out_spec, 01132 osg::ref_ptr<const Profile>& out_profile, 01133 unsigned int& out_tileSize ) 01134 { 01135 if ( !_db ) return 0L; 01136 01137 ScopedLock<Mutex> lock( _tableListMutex ); // b/c we're using the base db handle 01138 01139 #ifdef SPLIT_LAYER_DB 01140 sqlite3* db = getOrCreateMetaDbForThread(); 01141 #else 01142 sqlite3* db = getOrCreateDbForThread(); 01143 #endif 01144 if ( !db ) 01145 return 0L; 01146 01147 OE_DEBUG << LC << "Loading metadata for layer \"" << cacheId << "\"" << std::endl; 01148 01149 MetadataRecord rec; 01150 if ( _metadata.load( cacheId, db, rec ) ) 01151 { 01152 out_spec = CacheSpec( rec._layerName, rec._format ); 01153 out_tileSize = rec._tileSize; 01154 out_profile = rec._profile; 01155 } 01156 return 0L; 01157 } 01158 01162 bool getImage( const TileKey& key, const CacheSpec& spec, osg::ref_ptr<const osg::Image>& out_image ) 01163 { 01164 if ( !_db ) return false; 01165 01166 // wait if we are purging the db 01167 ScopedLock<Mutex> lock2( _pendingPurgeMutex ); 01168 01169 // first try the L2 cache. 01170 if ( _L2cache.valid() ) 01171 { 01172 if ( _L2cache->getImage( key, spec, out_image ) ) 01173 return true; 01174 } 01175 01176 // next check the deferred-write queue. 01177 if ( _options.asyncWrites() == true ) 01178 { 01179 #ifdef INSERT_POOL 01180 ScopedLock<Mutex> lock( _pendingWritesMutex ); 01181 std::string name = layerName; 01182 std::map<std::string, osg::ref_ptr<AsyncInsertPool> >::iterator it = _pendingWrites.find(name); 01183 if (it != _pendingWrites.end()) { 01184 AsyncInsertPool* p = it->second.get(); 01185 if (p) { 01186 osg::Image* img = p->findImage(key.str()); 01187 if (img) { 01188 // todo: update the access time, or let it slide? 01189 OE_DEBUG << LC << "Got key that is write-queued: " << key.str() << std::endl; 01190 return img; 01191 } 01192 } 01193 } 01194 #else 01195 ScopedLock<Mutex> lock( _pendingWritesMutex ); 01196 std::string name = key.str() + spec.cacheId(); //layerName; 01197 std::map<std::string,osg::ref_ptr<AsyncInsert> >::iterator i = _pendingWrites.find(name); 01198 if ( i != _pendingWrites.end() ) 01199 { 01200 // todo: update the access time, or let it slide? 01201 OE_DEBUG << LC << "Got key that is write-queued: " << key.str() << std::endl; 01202 out_image = i->second->_image.get(); 01203 return out_image.valid(); 01204 //return i->second->_image.get(); 01205 } 01206 #endif 01207 } 01208 01209 // finally, try to query the database. 01210 ThreadTable tt = getTable( spec.cacheId() ); //layerName); 01211 if ( tt._table ) 01212 { 01213 ImageRecord rec( key ); 01214 if (!tt._table->load( key, rec, tt._db )) 01215 return false; 01216 01217 // load it into the L2 cache 01218 out_image = rec._image.release(); 01219 01220 if ( out_image.valid() && _L2cache.valid() ) 01221 _L2cache->setImage( key, spec, out_image.get() ); 01222 01223 #ifdef UPDATE_ACCESS_TIMES 01224 01225 #ifdef UPDATE_ACCESS_TIMES_POOL 01226 // update the last-access time 01227 int t = (int)::time(0L); 01228 { 01229 ScopedLock<Mutex> lock( _pendingUpdateMutex ); 01230 osg::ref_ptr<AsyncUpdateAccessTimePool> pool; 01231 std::map<std::string,osg::ref_ptr<AsyncUpdateAccessTimePool> >::iterator i = _pendingUpdates.find( spec.cacheId() ); //layerName); 01232 if ( i != _pendingUpdates.end() ) 01233 { 01234 i->second->addEntry(key, t); 01235 pool = i->second; 01236 OE_DEBUG << LC << "Add key " << key.str() << " to existing layer batch " << spec.name() << std::endl; 01237 } else { 01238 pool = new AsyncUpdateAccessTimePool(spec.cacheId(), this); 01239 pool->addEntry(key, t); 01240 _pendingUpdates[spec.cacheId()] = pool.get(); 01241 _writeService->add(pool.get()); 01242 } 01243 } 01244 #else 01245 // update the last-access time 01246 int t = (int)::time(0L); 01247 _writeService->add( new AsyncUpdateAccessTime( key, layerName, t, this ) ); 01248 #endif 01249 01250 #endif // UPDATE_ACCESS_TIMES 01251 01252 return out_image.valid(); 01253 } 01254 else 01255 { 01256 OE_DEBUG << LC << "What, no layer table?" << std::endl; 01257 } 01258 return false; 01259 } 01260 01264 void setImage( const TileKey& key, const CacheSpec& spec, const osg::Image* image ) 01265 { 01266 if ( !_db ) return; 01267 01268 if ( _options.asyncWrites() == true ) 01269 { 01270 // the "pending writes" table is here so that we don't try to write data to 01271 // the cache more than once when using an asynchronous write service. 01272 ScopedLock<Mutex> lock( _pendingWritesMutex ); 01273 #ifdef INSERT_POOL 01274 std::string name = layerName; 01275 std::map<std::string, osg::ref_ptr<AsyncInsertPool> >::iterator it = _pendingWrites.find(name); 01276 if ( it == _pendingWrites.end() ) 01277 { 01278 AsyncInsertPool* req = new AsyncInsertPool(layerName, this); 01279 req->addEntry(key, format, image); 01280 _pendingWrites[name] = req; 01281 _writeService->add( req ); 01282 } 01283 else 01284 { 01285 it->second->addEntry(key, format, image); 01286 } 01287 #else 01288 std::string name = key.str() + spec.cacheId(); 01289 if ( _pendingWrites.find(name) == _pendingWrites.end() ) 01290 { 01291 AsyncInsert* req = new AsyncInsert(key, spec, image, this); 01292 _pendingWrites[name] = req; 01293 _writeService->add( req ); 01294 } 01295 else 01296 { 01297 //NOTE: this should probably never happen. 01298 OE_WARN << LC << "Tried to setImage; already in queue: " << key.str() << std::endl; 01299 } 01300 #endif 01301 } 01302 else 01303 { 01304 01305 setImageSync( key, spec, image ); 01306 } 01307 } 01308 01312 bool purge( const std::string& layerName, int olderThanUTC, bool async ) 01313 { 01314 if ( !_db ) return false; 01315 01316 // purge the L2 cache first: 01317 if ( async == true && _options.asyncWrites() == true ) 01318 { 01319 #ifdef PURGE_GENERAL 01320 if (!_pendingPurges.empty()) 01321 return false; 01322 ScopedLock<Mutex> lock( _pendingPurgeMutex ); 01323 AsyncPurge* req = new AsyncPurge(layerName, olderThanUTC, this); 01324 _writeService->add( req); 01325 _pendingPurges[layerName] = req; 01326 #else 01327 if (_pendingPurges.find(layerName) != _pendingPurges.end()) { 01328 return false; 01329 } else { 01330 ScopedLock<Mutex> lock( _pendingPurgeMutex ); 01331 AsyncPurge* req = new AsyncPurge(layerName, olderThanUTC, this); 01332 _writeService->add( req); 01333 _pendingPurges[layerName] = req; 01334 } 01335 #endif 01336 } 01337 else 01338 { 01339 #ifdef PURGE_GENERAL 01340 ScopedLock<Mutex> lock( _pendingPurgeMutex ); 01341 01342 sqlite3_int64 limit = _options.maxSize().value() * 1024 * 1024; 01343 std::map<std::string, std::pair<sqlite3_int64,int> > layers; 01344 sqlite3_int64 totalSize = 0; 01345 for (unsigned int i = 0; i < _layersList.size(); ++i) { 01346 ThreadTable tt = getTable( _layersList[i] ); 01347 if ( tt._table ) { 01348 sqlite3_int64 size = tt._table->getTableSize(tt._db); 01349 layers[_layersList[i] ].first = size; 01350 layers[_layersList[i] ].second = tt._table->getNbEntry(tt._db); 01351 totalSize += size; 01352 } 01353 } 01354 OE_INFO << LC << "SQlite cache size " << totalSize/(1024*1024) << " MB" << std::endl; 01355 if (totalSize > 1.2 * limit) { 01356 sqlite3_int64 diff = totalSize - limit; 01357 for (unsigned int i = 0; i < _layersList.size(); ++i) { 01358 float ratio = layers[_layersList[i] ].first * 1.0 / (float)(totalSize); 01359 int sizeToRemove = (int)floor(ratio * diff); 01360 if (sizeToRemove > 0) { 01361 if (sizeToRemove / 1024 > 1024) { 01362 OE_DEBUG << "Try to remove " << sizeToRemove/(1024*1024) << " MB in " << _layersList[i] << std::endl; 01363 } else { 01364 OE_DEBUG << "Try to remove " << sizeToRemove/1024 << " KB in " << _layersList[i] << std::endl; 01365 } 01366 01367 if ( _L2cache.valid() ) 01368 _L2cache->purge( _layersList[i], olderThanUTC, async ); 01369 ThreadTable tt = getTable(_layersList[i]); 01370 if ( tt._table ) { 01371 float averageSizePerElement = layers[_layersList[i] ].first * 1.0 /layers[_layersList[i] ].second; 01372 int nb = (int)floor(sizeToRemove / averageSizePerElement); 01373 if (nb ) { 01374 OE_DEBUG << "remove " << nb << " / " << layers[_layersList[i] ].second << " elements in " << _layersList[i] << std::endl; 01375 tt._table->purge(olderThanUTC, nb, tt._db); 01376 } 01377 } 01378 } 01379 } 01380 } 01381 _pendingPurges.clear(); 01382 displayPendingOperations(); 01383 01384 #else 01385 ScopedLock<Mutex> lock( _pendingPurgeMutex ); 01386 if ( _L2cache.valid() ) 01387 _L2cache->purge( layerName, olderThanUTC, async ); 01388 01389 ThreadTable tt = getTable( layerName ); 01390 if ( tt._table ) 01391 { 01392 _pendingPurges.erase( layerName ); 01393 01394 unsigned int maxsize = _options.getSize(layerName); 01395 tt._table->checkAndPurgeIfNeeded(tt._db, maxsize * 1024 * 1024); 01396 displayPendingOperations(); 01397 } 01398 #endif 01399 } 01400 return true; 01401 } 01402 01406 bool updateAccessTimeSync( const std::string& layerName, const TileKey& key, int newTimestamp ) 01407 { 01408 if ( !_db ) return false; 01409 01410 ThreadTable tt = getTable(layerName); 01411 if ( tt._table ) 01412 { 01413 tt._table->updateAccessTime( key, newTimestamp, tt._db ); 01414 } 01415 return true; 01416 } 01417 01421 bool updateAccessTimeSyncPool( const std::string& layerName, const std::string& keys, int newTimestamp ) 01422 { 01423 if ( !_db ) return false; 01424 01425 ThreadTable tt = getTable(layerName); 01426 if ( tt._table ) 01427 { 01428 tt._table->updateAccessTimePool( keys, newTimestamp, tt._db ); 01429 } 01430 01431 { 01432 ScopedLock<Mutex> lock( _pendingUpdateMutex ); 01433 _pendingUpdates.erase( layerName ); 01434 displayPendingOperations(); 01435 } 01436 return true; 01437 } 01438 01439 #ifdef INSERT_POOL 01440 void setImageSyncPool( AsyncInsertPool* pool, const std::string& layerName) 01441 { 01442 ScopedLock<Mutex> lock( _pendingWritesMutex ); 01443 const AsyncInsertPool::PoolContainer& entries = pool->_pool; 01444 OE_WARN << "write " << entries.size() << std::endl; 01445 ThreadTable tt = getTable(layerName); 01446 if ( tt._table ) 01447 { 01448 sqlite3_stmt* insert; 01449 if (!tt._table->beginStore( tt._db, insert )) { 01450 return; 01451 } 01452 if (!tt._table->storePool( tt._db, entries, insert )) { 01453 return; 01454 } 01455 if (!tt._table->endStore( tt._db, insert )) { 01456 return; 01457 } 01458 } 01459 _pendingWrites.erase( layerName ); 01460 displayPendingOperations(); 01461 } 01462 #endif 01463 01464 private: 01465 01466 void displayPendingOperations() { 01467 if (_pendingWrites.size()) 01468 OE_DEBUG<< LC << "pending insert " << _pendingWrites.size() << std::endl; 01469 if (_pendingUpdates.size()) 01470 OE_DEBUG << LC << "pending update " << _pendingUpdates.size() << std::endl; 01471 if (_pendingPurges.size()) 01472 OE_DEBUG << LC << "pending purge " << _pendingPurges.size() << std::endl; 01473 //OE_INFO << LC << "Pending writes: " << std::dec << _writeService->getNumRequests() << std::endl; 01474 } 01475 01476 void setImageSync( const TileKey& key, const CacheSpec& spec, const osg::Image* image ) 01477 { 01478 if (_options.maxSize().value() > 0 && _nbRequest > MAX_REQUEST_TO_RUN_PURGE) { 01479 int t = (int)::time(0L); 01480 purge(spec.cacheId(), t, _options.asyncWrites().value() ); 01481 _nbRequest = 0; 01482 } 01483 _nbRequest++; 01484 01485 ThreadTable tt = getTable( spec.cacheId() ); 01486 if ( tt._table ) 01487 { 01488 ::time_t t = ::time(0L); 01489 ImageRecord rec( key ); 01490 rec._created = (int)t; 01491 rec._accessed = (int)t; 01492 rec._image = image; 01493 01494 tt._table->store( rec, tt._db ); 01495 } 01496 01497 if ( _options.asyncWrites() == true ) 01498 { 01499 ScopedLock<Mutex> lock( _pendingWritesMutex ); 01500 std::string name = key.str() + spec.cacheId(); 01501 _pendingWrites.erase( name ); 01502 displayPendingOperations(); 01503 } 01504 } 01505 01506 01507 #ifdef SPLIT_LAYER_DB 01508 sqlite3* getOrCreateDbForThread(const std::string& layer) 01509 { 01510 sqlite3* db = 0L; 01511 01512 // this method assumes the thread already holds a lock on _tableListMutex, which 01513 // doubles to protect _dbPerThread 01514 01515 Thread* thread = Thread::CurrentThread(); 01516 std::map<Thread*,sqlite3*>::const_iterator k = _dbPerThreadLayers[layer].find(thread); 01517 if ( k == _dbPerThreadLayers[layer].end() ) 01518 { 01519 db = openDatabase( layer + _options.path().value(), _options.serialized().value() ); 01520 if ( db ) 01521 { 01522 _dbPerThreadLayers[layer][thread] = db; 01523 OE_INFO << LC << "Created DB handle " << std::hex << db << " for thread " << thread << std::endl; 01524 } 01525 else 01526 { 01527 OE_WARN << LC << "Failed to open DB on thread " << thread << std::endl; 01528 } 01529 } 01530 else 01531 { 01532 db = k->second; 01533 } 01534 01535 return db; 01536 } 01537 01538 01539 sqlite3* getOrCreateMetaDbForThread() 01540 { 01541 sqlite3* db = 0L; 01542 01543 // this method assumes the thread already holds a lock on _tableListMutex, which 01544 // doubles to protect _dbPerThread 01545 01546 Thread* thread = Thread::CurrentThread(); 01547 std::map<Thread*,sqlite3*>::const_iterator k = _dbPerThreadMeta.find(thread); 01548 if ( k == _dbPerThreadMeta.end() ) 01549 { 01550 db = openDatabase( _options.path().value(), _options.serialized().value() ); 01551 if ( db ) 01552 { 01553 _dbPerThreadMeta[thread] = db; 01554 OE_INFO << LC << "Created DB handle " << std::hex << db << " for thread " << thread << std::endl; 01555 } 01556 else 01557 { 01558 OE_WARN << LC << "Failed to open DB on thread " << thread << std::endl; 01559 } 01560 } 01561 else 01562 { 01563 db = k->second; 01564 } 01565 01566 return db; 01567 } 01568 01569 #else 01570 sqlite3* getOrCreateDbForThread() 01571 { 01572 sqlite3* db = 0L; 01573 01574 // this method assumes the thread already holds a lock on _tableListMutex, which 01575 // doubles to protect _dbPerThread 01576 01577 Thread* thread = Thread::CurrentThread(); 01578 std::map<Thread*,sqlite3*>::const_iterator k = _dbPerThread.find(thread); 01579 if ( k == _dbPerThread.end() ) 01580 { 01581 db = openDatabase( _databasePath, _options.serialized().value() ); 01582 if ( db ) 01583 { 01584 _dbPerThread[thread] = db; 01585 OE_DEBUG << LC << "Created DB handle " << std::hex << db << " for thread " << thread << std::endl; 01586 } 01587 else 01588 { 01589 OE_WARN << LC << "Failed to open DB on thread " << thread << std::endl; 01590 } 01591 } 01592 else 01593 { 01594 db = k->second; 01595 } 01596 01597 return db; 01598 } 01599 #endif 01600 01601 // gets the layer table for the specified layer name, creating it if it does 01602 // not already exist... 01603 ThreadTable getTable( const std::string& tableName ) 01604 { 01605 ScopedLock<Mutex> lock( _tableListMutex ); 01606 01607 #ifdef SPLIT_LAYER_DB 01608 sqlite3* db = getOrCreateDbForThread(tableName); 01609 #else 01610 sqlite3* db = getOrCreateDbForThread(); 01611 #endif 01612 if ( !db ) 01613 return ThreadTable( 0L, 0L ); 01614 01615 LayerTablesByName::iterator i = _tables.find(tableName); 01616 if ( i == _tables.end() ) 01617 { 01618 MetadataRecord meta; 01619 #ifdef SPLIT_LAYER_DB 01620 sqlite3* metadb = getOrCreateMetaDbForThread(); 01621 if ( !_metadata.load( tableName, metadb, meta ) ) 01622 #else 01623 if ( !_metadata.load( tableName, db, meta ) ) 01624 #endif 01625 { 01626 OE_WARN << LC << "Cannot operate on \"" << tableName << "\" because metadata does not exist." 01627 << std::endl; 01628 return ThreadTable( 0L, 0L ); 01629 } 01630 01631 _tables[tableName] = new LayerTable( meta, db ); 01632 OE_DEBUG << LC << "New LayerTable for " << tableName << std::endl; 01633 } 01634 return ThreadTable( _tables[tableName].get(), db ); 01635 } 01636 01637 private: 01638 01639 const Sqlite3CacheOptions _options; 01640 //osg::ref_ptr<const Sqlite3CacheOptions> _settings; 01641 osg::ref_ptr<osgDB::ReaderWriter> _defaultRW; 01642 Mutex _tableListMutex; 01643 MetadataTable _metadata; 01644 LayerTablesByName _tables; 01645 01646 bool _useAsyncWrites; 01647 osg::ref_ptr<TaskService> _writeService; 01648 Mutex _pendingWritesMutex; 01649 01650 #ifdef INSERT_POOL 01651 std::map<std::string, osg::ref_ptr<AsyncInsertPool> > _pendingWrites; 01652 #else 01653 std::map<std::string, osg::ref_ptr<AsyncInsert> > _pendingWrites; 01654 #endif 01655 Mutex _pendingUpdateMutex; 01656 std::map<std::string, osg::ref_ptr<AsyncUpdateAccessTimePool> > _pendingUpdates; 01657 01658 Mutex _pendingPurgeMutex; 01659 std::map<std::string, osg::ref_ptr<AsyncPurge> > _pendingPurges; 01660 01661 sqlite3* _db; 01662 std::map<Thread*,sqlite3*> _dbPerThread; 01663 01664 std::map<std::string, std::map<Thread*,sqlite3*> > _dbPerThreadLayers; 01665 std::map<Thread*,sqlite3*> _dbPerThreadMeta; 01666 01667 osg::ref_ptr<MemCache> _L2cache; 01668 01669 int _count; 01670 int _nbRequest; 01671 01672 std::vector<std::string> _layersList; 01673 std::string _databasePath; 01674 }; 01675 01676 01677 01678 AsyncUpdateAccessTime::AsyncUpdateAccessTime( const TileKey& key, const std::string& cacheId, int timeStamp, Sqlite3Cache* cache ) : 01679 _key(key), _cacheId(cacheId), _timeStamp(timeStamp), _cache(cache) 01680 { 01681 //nop 01682 } 01683 01684 void AsyncUpdateAccessTime::operator()( ProgressCallback* progress ) 01685 { 01686 osg::ref_ptr<Sqlite3Cache> cache = _cache.get(); 01687 if ( cache.valid() ) { 01688 //OE_WARN << "AsyncUpdateAccessTime will process " << _key << std::endl; 01689 cache->updateAccessTimeSync( _cacheId, _key , _timeStamp ); 01690 } 01691 } 01692 01693 01694 AsyncUpdateAccessTimePool::AsyncUpdateAccessTimePool(const std::string& cacheId, Sqlite3Cache* cache) : 01695 _cacheId(cacheId), _cache(cache) 01696 { 01697 //nop 01698 } 01699 01700 void AsyncUpdateAccessTimePool::addEntry(const TileKey& key, int timeStamp) 01701 { 01702 unsigned int lod = key.getLevelOfDetail(); 01703 addEntryInternal(key); 01704 if (lod > 0) { 01705 TileKey previous = key; 01706 for (int i = (int)lod-1; i >= 0; --i) { 01707 TileKey ancestor = previous.createAncestorKey(i); 01708 if (ancestor.valid()) 01709 addEntryInternal(ancestor); 01710 previous = ancestor; 01711 } 01712 } 01713 _timeStamp = timeStamp; 01714 } 01715 01716 void AsyncUpdateAccessTimePool::addEntryInternal(const TileKey& key) 01717 { 01718 const std::string& keyStr = key.str(); 01719 if (_keys.find(keyStr) == _keys.end()) { 01720 _keys[keyStr] = 1; 01721 if (_keyStr.empty()) 01722 _keyStr = keyStr; 01723 else 01724 _keyStr += ", " + keyStr; 01725 } 01726 } 01727 01728 void AsyncUpdateAccessTimePool::operator()( ProgressCallback* progress ) 01729 { 01730 osg::ref_ptr<Sqlite3Cache> cache = _cache.get(); 01731 if ( cache.valid() ) { 01732 //OE_INFO << "AsyncUpdateAccessTimePool will process " << _keys.size() << std::endl; 01733 cache->updateAccessTimeSyncPool( _cacheId, _keyStr , _timeStamp ); 01734 } 01735 } 01736 01737 01738 #ifdef INSERT_POOL 01739 AsyncInsertPool::AsyncInsertPool(const std::string& layerName, Sqlite3Cache* cache ) : _layerName(layerName), _cache(cache) { } 01740 void AsyncInsertPool::operator()( ProgressCallback* progress ) 01741 { 01742 osg::ref_ptr<Sqlite3Cache> cache = _cache.get(); 01743 if ( cache.valid() ) { 01744 cache->setImageSyncPool( this, _layerName ); 01745 } 01746 } 01747 #endif 01748 01749 //------------------------------------------------------------------------ 01750 01758 class Sqlite3CacheFactory : public CacheDriver 01759 { 01760 public: 01761 Sqlite3CacheFactory() 01762 { 01763 supportsExtension( "osgearth_cache_sqlite3", "Sqlite3 Cache for osgEarth" ); 01764 } 01765 01766 virtual const char* className() 01767 { 01768 return "Sqlite3 Cache for osgEarth"; 01769 } 01770 01771 virtual ReadResult readObject(const std::string& file_name, const Options* options) const 01772 { 01773 if ( !acceptsExtension(osgDB::getLowerCaseFileExtension( file_name ))) 01774 return ReadResult::FILE_NOT_HANDLED; 01775 01776 return ReadResult( new Sqlite3Cache( getCacheOptions(options) ) ); 01777 } 01778 }; 01779 01780 REGISTER_OSGPLUGIN(osgearth_cache_sqlite3, Sqlite3CacheFactory) 01781