osgEarth 2.1.1

/home/cube/sources/osgearth/src/osgEarth/ThreadingUtils

Go to the documentation of this file.
00001 /* -*-c++-*- */
00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph
00003  * Copyright 2008-2010 Pelican Mapping
00004  * http://osgearth.org
00005  *
00006  * osgEarth is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU Lesser General Public License as published by
00008  * the Free Software Foundation; either version 2 of the License, or
00009  * (at your option) any later version.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU Lesser General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU Lesser General Public License
00017  * along with this program.  If not, see <http://www.gnu.org/licenses/>
00018  */
00019 #ifndef OSGEARTH_THREADING_UTILS_H
00020 #define OSGEARTH_THREADING_UTILS_H 1
00021 
00022 #include <osgEarth/Common>
00023 #include <OpenThreads/Condition>
00024 #include <OpenThreads/Mutex>
00025 #include <OpenThreads/ReentrantMutex>
00026 #include <set>
00027 
00028 #define USE_CUSTOM_READ_WRITE_LOCK 1
00029 //#ifdef _DEBUG
00030 //#  define TRACE_THREADS 1
00031 //#endif
00032 
00033 namespace osgEarth { namespace Threading
00034 {   
00035     typedef OpenThreads::Mutex Mutex;
00036     typedef OpenThreads::ScopedLock<OpenThreads::Mutex> ScopedMutexLock;
00037     typedef OpenThreads::Thread Thread;
00038 
00039 #ifdef USE_CUSTOM_READ_WRITE_LOCK
00040 
00044     class Event 
00045     {
00046     public:
00047         Event() : _set( false ) { }
00048 
00049         ~Event() { 
00050             reset(); 
00051             for( int i=0; i<255; ++i ) // workaround buggy broadcast
00052                 _cond.signal();
00053         }
00054 
00055         inline bool wait() {
00056             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00057             return _set ? true : (_cond.wait( &_m ) == 0);
00058         }
00059 
00061         inline bool waitAndReset() {
00062             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00063             if ( _set ) {
00064                 _set = false;
00065                 return true;
00066             }
00067             else {
00068                 bool value = _cond.wait( &_m ) == 0;
00069                 _set = false;
00070                 return value;
00071             }
00072         }
00073 
00074         inline void set() {
00075             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00076             if ( !_set ) {
00077                 _set = true;
00078                 _cond.broadcast(); // possible deadlock before OSG r10457 on windows
00079                 //_cond.signal();
00080             }
00081         }
00082 
00083         inline void reset() {
00084             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00085             _set = false;
00086         }
00087 
00088         inline bool isSet() const {
00089             return _set;
00090         }
00091 
00092     protected:
00093         OpenThreads::Mutex _m;
00094         OpenThreads::Condition _cond;
00095         bool _set;
00096     };
00097 
00099     class MultiEvent 
00100     {
00101     public:
00102         MultiEvent( int num =1 ) : _set( num ), _num(num)  { }
00103 
00104         ~MultiEvent() {
00105             reset();
00106             for( int i=0; i<255; ++i ) // workaround buggy broadcast
00107                 _cond.signal();
00108         }
00109 
00110         inline bool wait() {
00111             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00112             while( _set > 0 )
00113                 if ( _cond.wait( &_m ) != 0 )
00114                     return false;
00115             return true;
00116         }
00117 
00119         inline bool waitAndReset() {
00120             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00121             while( _set > 0 )
00122                 if ( _cond.wait( &_m ) != 0 )
00123                     return false;
00124             _set = _num;
00125             return true;
00126         }
00127 
00128         inline void notify() {
00129             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00130             if ( _set > 0 )
00131                 --_set;
00132             if ( _set == 0 )
00133                 _cond.broadcast(); // possible deadlock before OSG r10457 on windows
00134             //_cond.signal();
00135         }
00136 
00137         inline void reset( int num =0 ) {
00138             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m );
00139             if ( num > 0 ) _num = num;
00140             _set = _num;
00141         }
00142 
00143     protected:
00144         OpenThreads::Mutex _m;
00145         OpenThreads::Condition _cond;
00146         int _set, _num;
00147     };
00148 
00156     class ReadWriteMutex
00157     {
00158 #if TRACE_THREADS
00159         typedef std::set<OpenThreads::Thread*> TracedThreads;
00160         TracedThreads _trace;
00161         OpenThreads::Mutex _traceMutex;
00162 #endif
00163 
00164     public:
00165         ReadWriteMutex() :
00166           _readerCount(0)
00167         { 
00168             _noWriterEvent.set();
00169             _noReadersEvent.set();
00170         }
00171 
00172         void readLock()
00173         {
00174 
00175 #ifdef TRACE_THREADS
00176             {
00177                 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex);
00178                 if( _trace.find(OpenThreads::Thread::CurrentThread()) != _trace.end() )
00179                     OE_WARN << "TRACE: tried to double-lock" << std::endl;
00180             }
00181 #endif
00182             for( ; ; )
00183             {
00184                 _noWriterEvent.wait();             // wait for a writer to quit if there is one
00185                 incrementReaderCount();            // register this reader
00186                 if ( !_noWriterEvent.isSet() )     // double lock check, in case a writer snuck in while inrementing
00187                     decrementReaderCount();        // if it did, undo the registration and try again
00188                 else
00189                     break;                         // otherwise, we're in
00190             }
00191 
00192 #ifdef TRACE_THREADS
00193             {
00194                 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex);
00195                 _trace.insert(OpenThreads::Thread::CurrentThread());
00196             }
00197 #endif
00198         }
00199 
00200         void readUnlock()
00201         {
00202             decrementReaderCount();                // unregister this reader
00203             
00204 #ifdef TRACE_THREADS
00205             {
00206                 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex);
00207                 _trace.erase(OpenThreads::Thread::CurrentThread());
00208             }
00209 #endif
00210         }
00211 
00212         void writeLock()
00213         {
00214 #ifdef TRACE_THREADS
00215             {
00216                 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex);
00217                 if( _trace.find(OpenThreads::Thread::CurrentThread()) != _trace.end() )
00218                     OE_WARN << "TRACE: tried to double-lock" << std::endl;
00219             }
00220 #endif
00221             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _lockWriterMutex ); // one at a time please
00222             _noWriterEvent.wait();    // wait for a writer to quit if there is one
00223             _noWriterEvent.reset();   // prevent further writers from joining
00224             _noReadersEvent.wait();   // wait for all readers to quit
00225 
00226 #ifdef TRACE_THREADS
00227             {
00228                 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex);
00229                 _trace.insert(OpenThreads::Thread::CurrentThread());
00230             }
00231 #endif
00232         }
00233 
00234         void writeUnlock()
00235         {
00236             _noWriterEvent.set();
00237 
00238 #ifdef TRACE_THREADS
00239             {
00240                 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex);
00241                 _trace.erase(OpenThreads::Thread::CurrentThread());
00242             }
00243 #endif
00244         }
00245 
00246     protected:
00247 
00248         void incrementReaderCount()
00249         {
00250             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _readerCountMutex );
00251             _readerCount++;            // add a reader
00252             _noReadersEvent.reset();   // there's at least one reader now so clear the flag
00253         }
00254 
00255         void decrementReaderCount()
00256         {
00257             OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _readerCountMutex );
00258             _readerCount--;               // remove a reader
00259             if ( _readerCount <= 0 )      // if that was the last one, signal that writers are now allowed
00260                 _noReadersEvent.set();
00261         }
00262 
00263     private:
00264         int _readerCount;
00265         OpenThreads::Mutex _lockWriterMutex;
00266         OpenThreads::Mutex _readerCountMutex;
00267         Event _noWriterEvent;
00268         Event _noReadersEvent;
00269     };
00270 
00271 
00272     struct ScopedWriteLock
00273     {
00274         ScopedWriteLock( ReadWriteMutex& lock ) : _lock(lock) { _lock.writeLock(); }
00275         ~ScopedWriteLock() { _lock.writeUnlock(); }
00276     protected:
00277         ReadWriteMutex& _lock;
00278     };
00279 
00280     struct ScopedReadLock
00281     {
00282         ScopedReadLock( ReadWriteMutex& lock ) : _lock(lock) { _lock.readLock(); }
00283         ~ScopedReadLock() { _lock.readUnlock(); }
00284     protected:
00285         ReadWriteMutex& _lock;
00286     };
00287 
00288 #else
00289 
00290     typedef OpenThreads::ReadWriteMutex  ReadWriteMutex;
00291     typedef OpenThreads::ScopedWriteLock ScopedWriteLock;
00292     typedef OpenThreads::ScopedReadLock  ScopedReadLock;
00293 
00294 #endif
00295 
00297     template<typename T>
00298     struct PerThread
00299     {
00300         T& get() {
00301             ScopedMutexLock lock(_mutex);
00302             return _data[OpenThreads::Thread::CurrentThread()];
00303         }
00304         const T& get() const {
00305             ScopedMutexLock lock(_mutex);
00306             return _data[OpenThreads::Thread::CurrentThread()];
00307         }
00308     private:
00309         std::map<OpenThreads::Thread*,T> _data;
00310         OpenThreads::Mutex               _mutex;
00311     };
00312 
00313 } } // namepsace osgEarth::Threading
00314 
00315 
00316 #endif // OSGEARTH_THREADING_UTILS_H
00317 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines