osgEarth 2.1.1
Public Member Functions | Private Member Functions | Private Attributes

Sqlite3Cache Class Reference

Inheritance diagram for Sqlite3Cache:
Collaboration diagram for Sqlite3Cache:

List of all members.

Public Member Functions

 Sqlite3Cache (const CacheOptions &options)
 Sqlite3Cache ()
 Sqlite3Cache (const Sqlite3Cache &rhs, const osg::CopyOp &op)
 META_Object (osgEarth, Sqlite3Cache)
bool isCached (const TileKey &key, const CacheSpec &spec) const
virtual void storeProperties (const CacheSpec &spec, const Profile *profile, unsigned int tileSize)
virtual bool loadProperties (const std::string &cacheId, CacheSpec &out_spec, osg::ref_ptr< const Profile > &out_profile, unsigned int &out_tileSize)
bool getImage (const TileKey &key, const CacheSpec &spec, osg::ref_ptr< const osg::Image > &out_image)
void setImage (const TileKey &key, const CacheSpec &spec, const osg::Image *image)
bool purge (const std::string &layerName, int olderThanUTC, bool async)
bool updateAccessTimeSync (const std::string &layerName, const TileKey &key, int newTimestamp)
bool updateAccessTimeSyncPool (const std::string &layerName, const std::string &keys, int newTimestamp)

Private Member Functions

void displayPendingOperations ()
void setImageSync (const TileKey &key, const CacheSpec &spec, const osg::Image *image)
sqlite3 * getOrCreateDbForThread ()
ThreadTable getTable (const std::string &tableName)

Private Attributes

const Sqlite3CacheOptions _options
osg::ref_ptr< osgDB::ReaderWriter > _defaultRW
Mutex _tableListMutex
MetadataTable _metadata
LayerTablesByName _tables
bool _useAsyncWrites
osg::ref_ptr< TaskService_writeService
Mutex _pendingWritesMutex
std::map< std::string,
osg::ref_ptr< AsyncInsert > > 
_pendingWrites
Mutex _pendingUpdateMutex
std::map< std::string,
osg::ref_ptr
< AsyncUpdateAccessTimePool > > 
_pendingUpdates
Mutex _pendingPurgeMutex
std::map< std::string,
osg::ref_ptr< AsyncPurge > > 
_pendingPurges
sqlite3 * _db
std::map< Thread *, sqlite3 * > _dbPerThread
std::map< std::string,
std::map< Thread *, sqlite3 * > > 
_dbPerThreadLayers
std::map< Thread *, sqlite3 * > _dbPerThreadMeta
osg::ref_ptr< MemCache_L2cache
int _count
int _nbRequest
std::vector< std::string > _layersList
std::string _databasePath

Detailed Description

Definition at line 1009 of file Sqlite3Cache.cpp.


Constructor & Destructor Documentation

Sqlite3Cache::Sqlite3Cache ( const CacheOptions options) [inline]

Definition at line 1012 of file Sqlite3Cache.cpp.

      : AsyncCache(options), _options(options),  _db(0L)
    {                
        if ( _options.path().get().empty() || options.getReferenceURI().empty() )
            _databasePath = _options.path().get();
        else
        {
           _databasePath = osgEarth::getFullPath( options.getReferenceURI(), _options.path().get() );
        }

        
        setName( "sqlite3" );

        _nbRequest = 0;

        //_settings = dynamic_cast<const Sqlite3CacheOptions*>( options );
        //if ( !_settings.valid() )
        //    _settings = new Sqlite3CacheOptions( options );

        OE_INFO << LC << "options: " << _options.getConfig().toString() << std::endl;

        if ( sqlite3_threadsafe() == 0 )
        {
            OE_WARN << LC << "SQLITE3 IS NOT COMPILED IN THREAD-SAFE MODE" << std::endl;
            // TODO: something in this unlikely condition
        }

        // enabled shared cache mode.
        //sqlite3_enable_shared_cache( 1 );

#ifdef USE_L2_CACHE
        _L2cache = new MemCache();
        _L2cache->setMaxNumTilesInCache( 64 );
        OE_INFO << LC << "Using L2 memory cache" << std::endl;
#endif
        
        _db = openDatabase( _databasePath, _options.serialized().value() );

        if ( _db )
        {
            if ( ! _metadata.initialize( _db ) )
                _db = 0L;
        }

        if ( _db && _options.asyncWrites() == true )
        {
            _writeService = new osgEarth::TaskService( "Sqlite3Cache Write Service", 1 );
        }

        
        if (!_metadata.loadAllLayers( _db, _layersList )) {
            OE_WARN << "can't read layers in meta data" << std::endl;
        }

    }

Here is the call graph for this function:

Sqlite3Cache::Sqlite3Cache ( ) [inline]

Definition at line 1069 of file Sqlite3Cache.cpp.

{ }
Sqlite3Cache::Sqlite3Cache ( const Sqlite3Cache rhs,
const osg::CopyOp &  op 
) [inline]

Definition at line 1070 of file Sqlite3Cache.cpp.

{ }

Member Function Documentation

void Sqlite3Cache::displayPendingOperations ( ) [inline, private]

Definition at line 1466 of file Sqlite3Cache.cpp.

                                    {
        if (_pendingWrites.size())
            OE_DEBUG<< LC << "pending insert " << _pendingWrites.size() << std::endl;
        if (_pendingUpdates.size())
            OE_DEBUG << LC << "pending update " << _pendingUpdates.size() << std::endl;
        if (_pendingPurges.size())
            OE_DEBUG << LC << "pending purge " << _pendingPurges.size() << std::endl;
        //OE_INFO << LC << "Pending writes: " << std::dec << _writeService->getNumRequests() << std::endl;
    }
bool Sqlite3Cache::getImage ( const TileKey key,
const CacheSpec spec,
osg::ref_ptr< const osg::Image > &  out_image 
) [inline, virtual]

Gets the cached image for the given TileKey

Implements osgEarth::Cache.

Definition at line 1162 of file Sqlite3Cache.cpp.

    {
        if ( !_db ) return false;

        // wait if we are purging the db
        ScopedLock<Mutex> lock2( _pendingPurgeMutex );

        // first try the L2 cache.
        if ( _L2cache.valid() )
        {
            if ( _L2cache->getImage( key, spec, out_image ) )
                return true;
        }

        // next check the deferred-write queue.
        if ( _options.asyncWrites() == true )
        {
#ifdef INSERT_POOL
            ScopedLock<Mutex> lock( _pendingWritesMutex );
            std::string name = layerName;
            std::map<std::string, osg::ref_ptr<AsyncInsertPool> >::iterator it = _pendingWrites.find(name);
            if (it != _pendingWrites.end()) {
                AsyncInsertPool* p = it->second.get();
                if (p) {
                    osg::Image* img = p->findImage(key.str());
                    if (img) {
                        // todo: update the access time, or let it slide?
                        OE_DEBUG << LC << "Got key that is write-queued: " << key.str() << std::endl;
                        return img;
                    }
                }
            }
#else
            ScopedLock<Mutex> lock( _pendingWritesMutex );
            std::string name = key.str() + spec.cacheId(); //layerName;
            std::map<std::string,osg::ref_ptr<AsyncInsert> >::iterator i = _pendingWrites.find(name);
            if ( i != _pendingWrites.end() )
            {
                // todo: update the access time, or let it slide?
                OE_DEBUG << LC << "Got key that is write-queued: " << key.str() << std::endl;
                out_image = i->second->_image.get();
                return out_image.valid();
                //return i->second->_image.get();
            }
#endif
        }

        // finally, try to query the database.
        ThreadTable tt = getTable( spec.cacheId() ); //layerName);
        if ( tt._table )
        {
            ImageRecord rec( key );
            if (!tt._table->load( key, rec, tt._db ))
                return false;

            // load it into the L2 cache
            out_image = rec._image.release();

            if ( out_image.valid() && _L2cache.valid() )
                _L2cache->setImage( key, spec, out_image.get() );

#ifdef UPDATE_ACCESS_TIMES

#ifdef UPDATE_ACCESS_TIMES_POOL
            // update the last-access time
            int t = (int)::time(0L);
            {
                ScopedLock<Mutex> lock( _pendingUpdateMutex );
                osg::ref_ptr<AsyncUpdateAccessTimePool> pool;
                std::map<std::string,osg::ref_ptr<AsyncUpdateAccessTimePool> >::iterator i = _pendingUpdates.find( spec.cacheId() ); //layerName);
                if ( i != _pendingUpdates.end() )
                {
                    i->second->addEntry(key, t);
                    pool = i->second;
                    OE_DEBUG << LC << "Add key " << key.str() << " to existing layer batch " << spec.name() << std::endl;
                } else {
                    pool = new AsyncUpdateAccessTimePool(spec.cacheId(), this);
                    pool->addEntry(key, t);
                    _pendingUpdates[spec.cacheId()] = pool.get();
                    _writeService->add(pool.get());
                }
            }
#else
            // update the last-access time
            int t = (int)::time(0L);
            _writeService->add( new AsyncUpdateAccessTime(  key, layerName, t, this ) );
#endif

#endif // UPDATE_ACCESS_TIMES

            return out_image.valid();
        }
        else
        {
            OE_DEBUG << LC << "What, no layer table?" << std::endl;
        }
        return false;
    }

Here is the call graph for this function:

sqlite3* Sqlite3Cache::getOrCreateDbForThread ( ) [inline, private]

Definition at line 1570 of file Sqlite3Cache.cpp.

    {
        sqlite3* db = 0L;

        // this method assumes the thread already holds a lock on _tableListMutex, which
        // doubles to protect _dbPerThread

        Thread* thread = Thread::CurrentThread();
        std::map<Thread*,sqlite3*>::const_iterator k = _dbPerThread.find(thread);
        if ( k == _dbPerThread.end() )
        {
            db = openDatabase( _databasePath, _options.serialized().value() );
            if ( db )
            {
                _dbPerThread[thread] = db;
                OE_DEBUG << LC << "Created DB handle " << std::hex << db << " for thread " << thread << std::endl;
            }
            else
            {
                OE_WARN << LC << "Failed to open DB on thread " << thread << std::endl;
            }
        }
        else
        {
            db = k->second;
        }

        return db;
    }

Here is the call graph for this function:

ThreadTable Sqlite3Cache::getTable ( const std::string &  tableName) [inline, private]

Definition at line 1603 of file Sqlite3Cache.cpp.

    {
        ScopedLock<Mutex> lock( _tableListMutex );

#ifdef SPLIT_LAYER_DB
        sqlite3* db = getOrCreateDbForThread(tableName);
#else
        sqlite3* db = getOrCreateDbForThread();
#endif
        if ( !db )
            return ThreadTable( 0L, 0L );

        LayerTablesByName::iterator i = _tables.find(tableName);
        if ( i == _tables.end() )
        {
            MetadataRecord meta;
#ifdef SPLIT_LAYER_DB
            sqlite3* metadb = getOrCreateMetaDbForThread();
            if ( !_metadata.load( tableName, metadb, meta ) )
#else
            if ( !_metadata.load( tableName, db, meta ) )
#endif
            {
                OE_WARN << LC << "Cannot operate on \"" << tableName << "\" because metadata does not exist."
                    << std::endl;
                return ThreadTable( 0L, 0L );
            }

            _tables[tableName] = new LayerTable( meta, db );
            OE_DEBUG << LC << "New LayerTable for " << tableName << std::endl;
        }
        return ThreadTable( _tables[tableName].get(), db );
    }
bool Sqlite3Cache::isCached ( const TileKey key,
const CacheSpec spec 
) const [inline, virtual]

Gets whether the given TileKey is cached or not

Reimplemented from osgEarth::Cache.

Definition at line 1078 of file Sqlite3Cache.cpp.

    {
        // this looks ineffecient, but usually when isCached() is called, getImage() will be
        // called soon thereafter. And this call will load it into the L2 cache so the subsequent
        // getImage call will not hit the DB again.
        osg::ref_ptr<const osg::Image> temp;
        return const_cast<Sqlite3Cache*>(this)->getImage( key, spec, temp );
    }
virtual bool Sqlite3Cache::loadProperties ( const std::string &  cacheId,
CacheSpec out_spec,
osg::ref_ptr< const Profile > &  out_profile,
unsigned int &  out_tileSize 
) [inline, virtual]

Loads the cache profile for the given layer.

Reimplemented from osgEarth::Cache.

Definition at line 1129 of file Sqlite3Cache.cpp.

    {
        if ( !_db ) return 0L;

        ScopedLock<Mutex> lock( _tableListMutex ); // b/c we're using the base db handle

#ifdef SPLIT_LAYER_DB
        sqlite3* db = getOrCreateMetaDbForThread();
#else
        sqlite3* db = getOrCreateDbForThread();
#endif
        if ( !db )
            return 0L;

        OE_DEBUG << LC << "Loading metadata for layer \"" << cacheId << "\"" << std::endl;

        MetadataRecord rec;
        if ( _metadata.load( cacheId, db, rec ) )
        {
            out_spec = CacheSpec( rec._layerName, rec._format );
            out_tileSize = rec._tileSize;
            out_profile = rec._profile;
        }
        return 0L;
    }
Sqlite3Cache::META_Object ( osgEarth  ,
Sqlite3Cache   
)
bool Sqlite3Cache::purge ( const std::string &  layerName,
int  olderThanUTC,
bool  async 
) [inline, virtual]

Purges records from the database.

Reimplemented from osgEarth::Cache.

Definition at line 1312 of file Sqlite3Cache.cpp.

    {
        if ( !_db ) return false;

        // purge the L2 cache first:
        if ( async == true && _options.asyncWrites() == true )
        {
#ifdef PURGE_GENERAL
            if (!_pendingPurges.empty())
                return false;
            ScopedLock<Mutex> lock( _pendingPurgeMutex );
            AsyncPurge* req = new AsyncPurge(layerName, olderThanUTC, this);
            _writeService->add( req);
            _pendingPurges[layerName] = req;
#else
            if (_pendingPurges.find(layerName) != _pendingPurges.end()) {
                return false;
            } else {
                ScopedLock<Mutex> lock( _pendingPurgeMutex );
                AsyncPurge* req = new AsyncPurge(layerName, olderThanUTC, this);
                _writeService->add( req);
                _pendingPurges[layerName] = req;
            }
#endif
        }
        else
        {
#ifdef PURGE_GENERAL
            ScopedLock<Mutex> lock( _pendingPurgeMutex );

            sqlite3_int64 limit = _options.maxSize().value() * 1024 * 1024;
            std::map<std::string, std::pair<sqlite3_int64,int> > layers;
            sqlite3_int64 totalSize = 0;
            for (unsigned int i = 0; i < _layersList.size(); ++i) {
                ThreadTable tt = getTable( _layersList[i] );
                if ( tt._table ) {
                    sqlite3_int64 size = tt._table->getTableSize(tt._db);
                    layers[_layersList[i] ].first = size;
                    layers[_layersList[i] ].second = tt._table->getNbEntry(tt._db);
                    totalSize += size;
                }
            }
            OE_INFO << LC << "SQlite cache size " << totalSize/(1024*1024) << " MB" << std::endl;
            if (totalSize > 1.2 * limit) {
                sqlite3_int64 diff = totalSize - limit;
                for (unsigned int i = 0; i < _layersList.size(); ++i) {
                    float ratio = layers[_layersList[i] ].first * 1.0 / (float)(totalSize);
                    int sizeToRemove = (int)floor(ratio * diff);
                    if (sizeToRemove > 0) {
                        if (sizeToRemove / 1024 > 1024) {
                            OE_DEBUG << "Try to remove " << sizeToRemove/(1024*1024) << " MB in " << _layersList[i] << std::endl;
                        } else {
                            OE_DEBUG << "Try to remove " << sizeToRemove/1024 << " KB in " << _layersList[i] << std::endl;
                        }

                        if ( _L2cache.valid() )
                            _L2cache->purge( _layersList[i], olderThanUTC, async );
                        ThreadTable tt = getTable(_layersList[i]);
                        if ( tt._table ) {
                            float averageSizePerElement = layers[_layersList[i] ].first * 1.0 /layers[_layersList[i] ].second;
                            int nb = (int)floor(sizeToRemove / averageSizePerElement);
                            if (nb ) {
                                OE_DEBUG << "remove " << nb << " / " << layers[_layersList[i] ].second << " elements in " << _layersList[i] << std::endl;
                                tt._table->purge(olderThanUTC, nb, tt._db);
                            }
                        }
                    }
                }
            }
            _pendingPurges.clear();
            displayPendingOperations();

#else
            ScopedLock<Mutex> lock( _pendingPurgeMutex );
            if ( _L2cache.valid() )
                _L2cache->purge( layerName, olderThanUTC, async );

            ThreadTable tt = getTable( layerName );
            if ( tt._table )
            {
                _pendingPurges.erase( layerName );

                unsigned int maxsize = _options.getSize(layerName);
                tt._table->checkAndPurgeIfNeeded(tt._db, maxsize * 1024 * 1024);
                displayPendingOperations();
            }
#endif
        }
        return true;
    }

Here is the call graph for this function:

void Sqlite3Cache::setImage ( const TileKey key,
const CacheSpec spec,
const osg::Image *  image 
) [inline, virtual]

Sets the cached image for the given TileKey

Implements osgEarth::Cache.

Definition at line 1264 of file Sqlite3Cache.cpp.

    {        
        if ( !_db ) return;

        if ( _options.asyncWrites() == true )
        {
            // the "pending writes" table is here so that we don't try to write data to
            // the cache more than once when using an asynchronous write service.
            ScopedLock<Mutex> lock( _pendingWritesMutex );
#ifdef INSERT_POOL
            std::string name = layerName;
            std::map<std::string, osg::ref_ptr<AsyncInsertPool> >::iterator it = _pendingWrites.find(name);
            if ( it == _pendingWrites.end() )
            {
                AsyncInsertPool* req = new AsyncInsertPool(layerName, this);
                req->addEntry(key, format, image);
                _pendingWrites[name] = req;
                _writeService->add( req );
            }
            else
            {
                it->second->addEntry(key, format, image);
            }
#else
            std::string name = key.str() + spec.cacheId();
            if ( _pendingWrites.find(name) == _pendingWrites.end() )
            {
                AsyncInsert* req = new AsyncInsert(key, spec, image, this);
                _pendingWrites[name] = req;
                _writeService->add( req );
            }
            else
            {
                //NOTE: this should probably never happen.
                OE_WARN << LC << "Tried to setImage; already in queue: " << key.str() << std::endl;
            }
#endif
        }
        else
        {

            setImageSync( key, spec, image );
        }
    }

Here is the call graph for this function:

void Sqlite3Cache::setImageSync ( const TileKey key,
const CacheSpec spec,
const osg::Image *  image 
) [inline, private, virtual]

Implements AsyncCache.

Definition at line 1476 of file Sqlite3Cache.cpp.

    {
        if (_options.maxSize().value() > 0 && _nbRequest > MAX_REQUEST_TO_RUN_PURGE) {
            int t = (int)::time(0L);
            purge(spec.cacheId(), t, _options.asyncWrites().value() );
            _nbRequest = 0;
        }
        _nbRequest++;

        ThreadTable tt = getTable( spec.cacheId() );
        if ( tt._table )
        {
            ::time_t t = ::time(0L);
            ImageRecord rec( key );
            rec._created = (int)t;
            rec._accessed = (int)t;
            rec._image = image;

            tt._table->store( rec, tt._db );
        }

        if ( _options.asyncWrites() == true )
        {
            ScopedLock<Mutex> lock( _pendingWritesMutex );
            std::string name = key.str() + spec.cacheId();
            _pendingWrites.erase( name );
            displayPendingOperations();
        }
    }

Here is the call graph for this function:

virtual void Sqlite3Cache::storeProperties ( const CacheSpec spec,
const Profile profile,
unsigned int  tileSize 
) [inline, virtual]

Store the cache profile for the given profile.

Reimplemented from osgEarth::Cache.

Definition at line 1090 of file Sqlite3Cache.cpp.

    {
        if ( !_db ) return;

        if ( spec.cacheId().empty() || profile == 0L || spec.format().empty() )
        {
            OE_WARN << "ILLEGAL: cannot cache a layer without a layer id" << std::endl;
            return;
        }

        ScopedLock<Mutex> lock( _tableListMutex ); // b/c we're using the base db handle
#ifdef SPLIT_LAYER_DB
        sqlite3* db = getOrCreateMetaDbForThread();
#else
        sqlite3* db = getOrCreateDbForThread();
#endif
        if ( !db )
            return;

        //OE_INFO << "Storing metadata for layer \"" << layerName << "\"" << std::endl;

        MetadataRecord rec;
        rec._layerName = spec.cacheId();
        rec._profile = profile;
        rec._tileSize = tileSize;

#ifdef USE_SERIALIZERS
        rec._format = "osgb";
        rec._compressor = "zlib";
#else
        rec._format = spec.format();
#endif

        _metadata.store( rec, db );
    }

Here is the call graph for this function:

bool Sqlite3Cache::updateAccessTimeSync ( const std::string &  layerName,
const TileKey key,
int  newTimestamp 
) [inline]

updateAccessTime records on the database.

Definition at line 1406 of file Sqlite3Cache.cpp.

    {
        if ( !_db ) return false;

        ThreadTable tt = getTable(layerName);
        if ( tt._table )
        {
            tt._table->updateAccessTime( key, newTimestamp, tt._db );
        }
        return true;
    }

Here is the call graph for this function:

bool Sqlite3Cache::updateAccessTimeSyncPool ( const std::string &  layerName,
const std::string &  keys,
int  newTimestamp 
) [inline]

updateAccessTime records on the database.

Definition at line 1421 of file Sqlite3Cache.cpp.

    {
        if ( !_db ) return false;

        ThreadTable tt = getTable(layerName);
        if ( tt._table )
        {
            tt._table->updateAccessTimePool( keys, newTimestamp, tt._db );
        }

        {
            ScopedLock<Mutex> lock( _pendingUpdateMutex );
            _pendingUpdates.erase( layerName );
            displayPendingOperations();
        }
        return true;
    }

Here is the call graph for this function:


Member Data Documentation

int Sqlite3Cache::_count [private]

Definition at line 1669 of file Sqlite3Cache.cpp.

std::string Sqlite3Cache::_databasePath [private]

Definition at line 1673 of file Sqlite3Cache.cpp.

sqlite3* Sqlite3Cache::_db [private]

Definition at line 1661 of file Sqlite3Cache.cpp.

std::map<Thread*,sqlite3*> Sqlite3Cache::_dbPerThread [private]

Definition at line 1662 of file Sqlite3Cache.cpp.

std::map<std::string, std::map<Thread*,sqlite3*> > Sqlite3Cache::_dbPerThreadLayers [private]

Definition at line 1664 of file Sqlite3Cache.cpp.

std::map<Thread*,sqlite3*> Sqlite3Cache::_dbPerThreadMeta [private]

Definition at line 1665 of file Sqlite3Cache.cpp.

osg::ref_ptr<osgDB::ReaderWriter> Sqlite3Cache::_defaultRW [private]

Definition at line 1641 of file Sqlite3Cache.cpp.

osg::ref_ptr<MemCache> Sqlite3Cache::_L2cache [private]

Definition at line 1667 of file Sqlite3Cache.cpp.

std::vector<std::string> Sqlite3Cache::_layersList [private]

Definition at line 1672 of file Sqlite3Cache.cpp.

Definition at line 1643 of file Sqlite3Cache.cpp.

int Sqlite3Cache::_nbRequest [private]

Definition at line 1670 of file Sqlite3Cache.cpp.

Reimplemented from osgEarth::Cache.

Definition at line 1639 of file Sqlite3Cache.cpp.

Definition at line 1658 of file Sqlite3Cache.cpp.

std::map<std::string, osg::ref_ptr<AsyncPurge> > Sqlite3Cache::_pendingPurges [private]

Definition at line 1659 of file Sqlite3Cache.cpp.

Definition at line 1655 of file Sqlite3Cache.cpp.

std::map<std::string, osg::ref_ptr<AsyncUpdateAccessTimePool> > Sqlite3Cache::_pendingUpdates [private]

Definition at line 1656 of file Sqlite3Cache.cpp.

std::map<std::string, osg::ref_ptr<AsyncInsert> > Sqlite3Cache::_pendingWrites [private]

Definition at line 1653 of file Sqlite3Cache.cpp.

Definition at line 1648 of file Sqlite3Cache.cpp.

Definition at line 1642 of file Sqlite3Cache.cpp.

Definition at line 1644 of file Sqlite3Cache.cpp.

Definition at line 1646 of file Sqlite3Cache.cpp.

osg::ref_ptr<TaskService> Sqlite3Cache::_writeService [private]

Definition at line 1647 of file Sqlite3Cache.cpp.


The documentation for this class was generated from the following file:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines