osgEarth 2.1.1
|
00001 /* -*-c++-*- */ 00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph 00003 * Copyright 2008-2010 Pelican Mapping 00004 * http://osgearth.org 00005 * 00006 * osgEarth is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU Lesser General Public License as published by 00008 * the Free Software Foundation; either version 2 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU Lesser General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU Lesser General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/> 00018 */ 00019 #ifndef OSGEARTH_THREADING_UTILS_H 00020 #define OSGEARTH_THREADING_UTILS_H 1 00021 00022 #include <osgEarth/Common> 00023 #include <OpenThreads/Condition> 00024 #include <OpenThreads/Mutex> 00025 #include <OpenThreads/ReentrantMutex> 00026 #include <set> 00027 00028 #define USE_CUSTOM_READ_WRITE_LOCK 1 00029 //#ifdef _DEBUG 00030 //# define TRACE_THREADS 1 00031 //#endif 00032 00033 namespace osgEarth { namespace Threading 00034 { 00035 typedef OpenThreads::Mutex Mutex; 00036 typedef OpenThreads::ScopedLock<OpenThreads::Mutex> ScopedMutexLock; 00037 typedef OpenThreads::Thread Thread; 00038 00039 #ifdef USE_CUSTOM_READ_WRITE_LOCK 00040 00044 class Event 00045 { 00046 public: 00047 Event() : _set( false ) { } 00048 00049 ~Event() { 00050 reset(); 00051 for( int i=0; i<255; ++i ) // workaround buggy broadcast 00052 _cond.signal(); 00053 } 00054 00055 inline bool wait() { 00056 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00057 return _set ? true : (_cond.wait( &_m ) == 0); 00058 } 00059 00061 inline bool waitAndReset() { 00062 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00063 if ( _set ) { 00064 _set = false; 00065 return true; 00066 } 00067 else { 00068 bool value = _cond.wait( &_m ) == 0; 00069 _set = false; 00070 return value; 00071 } 00072 } 00073 00074 inline void set() { 00075 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00076 if ( !_set ) { 00077 _set = true; 00078 _cond.broadcast(); // possible deadlock before OSG r10457 on windows 00079 //_cond.signal(); 00080 } 00081 } 00082 00083 inline void reset() { 00084 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00085 _set = false; 00086 } 00087 00088 inline bool isSet() const { 00089 return _set; 00090 } 00091 00092 protected: 00093 OpenThreads::Mutex _m; 00094 OpenThreads::Condition _cond; 00095 bool _set; 00096 }; 00097 00099 class MultiEvent 00100 { 00101 public: 00102 MultiEvent( int num =1 ) : _set( num ), _num(num) { } 00103 00104 ~MultiEvent() { 00105 reset(); 00106 for( int i=0; i<255; ++i ) // workaround buggy broadcast 00107 _cond.signal(); 00108 } 00109 00110 inline bool wait() { 00111 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00112 while( _set > 0 ) 00113 if ( _cond.wait( &_m ) != 0 ) 00114 return false; 00115 return true; 00116 } 00117 00119 inline bool waitAndReset() { 00120 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00121 while( _set > 0 ) 00122 if ( _cond.wait( &_m ) != 0 ) 00123 return false; 00124 _set = _num; 00125 return true; 00126 } 00127 00128 inline void notify() { 00129 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00130 if ( _set > 0 ) 00131 --_set; 00132 if ( _set == 0 ) 00133 _cond.broadcast(); // possible deadlock before OSG r10457 on windows 00134 //_cond.signal(); 00135 } 00136 00137 inline void reset( int num =0 ) { 00138 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _m ); 00139 if ( num > 0 ) _num = num; 00140 _set = _num; 00141 } 00142 00143 protected: 00144 OpenThreads::Mutex _m; 00145 OpenThreads::Condition _cond; 00146 int _set, _num; 00147 }; 00148 00156 class ReadWriteMutex 00157 { 00158 #if TRACE_THREADS 00159 typedef std::set<OpenThreads::Thread*> TracedThreads; 00160 TracedThreads _trace; 00161 OpenThreads::Mutex _traceMutex; 00162 #endif 00163 00164 public: 00165 ReadWriteMutex() : 00166 _readerCount(0) 00167 { 00168 _noWriterEvent.set(); 00169 _noReadersEvent.set(); 00170 } 00171 00172 void readLock() 00173 { 00174 00175 #ifdef TRACE_THREADS 00176 { 00177 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex); 00178 if( _trace.find(OpenThreads::Thread::CurrentThread()) != _trace.end() ) 00179 OE_WARN << "TRACE: tried to double-lock" << std::endl; 00180 } 00181 #endif 00182 for( ; ; ) 00183 { 00184 _noWriterEvent.wait(); // wait for a writer to quit if there is one 00185 incrementReaderCount(); // register this reader 00186 if ( !_noWriterEvent.isSet() ) // double lock check, in case a writer snuck in while inrementing 00187 decrementReaderCount(); // if it did, undo the registration and try again 00188 else 00189 break; // otherwise, we're in 00190 } 00191 00192 #ifdef TRACE_THREADS 00193 { 00194 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex); 00195 _trace.insert(OpenThreads::Thread::CurrentThread()); 00196 } 00197 #endif 00198 } 00199 00200 void readUnlock() 00201 { 00202 decrementReaderCount(); // unregister this reader 00203 00204 #ifdef TRACE_THREADS 00205 { 00206 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex); 00207 _trace.erase(OpenThreads::Thread::CurrentThread()); 00208 } 00209 #endif 00210 } 00211 00212 void writeLock() 00213 { 00214 #ifdef TRACE_THREADS 00215 { 00216 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex); 00217 if( _trace.find(OpenThreads::Thread::CurrentThread()) != _trace.end() ) 00218 OE_WARN << "TRACE: tried to double-lock" << std::endl; 00219 } 00220 #endif 00221 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _lockWriterMutex ); // one at a time please 00222 _noWriterEvent.wait(); // wait for a writer to quit if there is one 00223 _noWriterEvent.reset(); // prevent further writers from joining 00224 _noReadersEvent.wait(); // wait for all readers to quit 00225 00226 #ifdef TRACE_THREADS 00227 { 00228 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex); 00229 _trace.insert(OpenThreads::Thread::CurrentThread()); 00230 } 00231 #endif 00232 } 00233 00234 void writeUnlock() 00235 { 00236 _noWriterEvent.set(); 00237 00238 #ifdef TRACE_THREADS 00239 { 00240 OpenThreads::ScopedLock<OpenThreads::Mutex> ttLock(_traceMutex); 00241 _trace.erase(OpenThreads::Thread::CurrentThread()); 00242 } 00243 #endif 00244 } 00245 00246 protected: 00247 00248 void incrementReaderCount() 00249 { 00250 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _readerCountMutex ); 00251 _readerCount++; // add a reader 00252 _noReadersEvent.reset(); // there's at least one reader now so clear the flag 00253 } 00254 00255 void decrementReaderCount() 00256 { 00257 OpenThreads::ScopedLock<OpenThreads::Mutex> lock( _readerCountMutex ); 00258 _readerCount--; // remove a reader 00259 if ( _readerCount <= 0 ) // if that was the last one, signal that writers are now allowed 00260 _noReadersEvent.set(); 00261 } 00262 00263 private: 00264 int _readerCount; 00265 OpenThreads::Mutex _lockWriterMutex; 00266 OpenThreads::Mutex _readerCountMutex; 00267 Event _noWriterEvent; 00268 Event _noReadersEvent; 00269 }; 00270 00271 00272 struct ScopedWriteLock 00273 { 00274 ScopedWriteLock( ReadWriteMutex& lock ) : _lock(lock) { _lock.writeLock(); } 00275 ~ScopedWriteLock() { _lock.writeUnlock(); } 00276 protected: 00277 ReadWriteMutex& _lock; 00278 }; 00279 00280 struct ScopedReadLock 00281 { 00282 ScopedReadLock( ReadWriteMutex& lock ) : _lock(lock) { _lock.readLock(); } 00283 ~ScopedReadLock() { _lock.readUnlock(); } 00284 protected: 00285 ReadWriteMutex& _lock; 00286 }; 00287 00288 #else 00289 00290 typedef OpenThreads::ReadWriteMutex ReadWriteMutex; 00291 typedef OpenThreads::ScopedWriteLock ScopedWriteLock; 00292 typedef OpenThreads::ScopedReadLock ScopedReadLock; 00293 00294 #endif 00295 00297 template<typename T> 00298 struct PerThread 00299 { 00300 T& get() { 00301 ScopedMutexLock lock(_mutex); 00302 return _data[OpenThreads::Thread::CurrentThread()]; 00303 } 00304 const T& get() const { 00305 ScopedMutexLock lock(_mutex); 00306 return _data[OpenThreads::Thread::CurrentThread()]; 00307 } 00308 private: 00309 std::map<OpenThreads::Thread*,T> _data; 00310 OpenThreads::Mutex _mutex; 00311 }; 00312 00313 } } // namepsace osgEarth::Threading 00314 00315 00316 #endif // OSGEARTH_THREADING_UTILS_H 00317