osgEarth 2.1.1
|
00001 /* -*-c++-*- */ 00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph 00003 * Copyright 2008-2010 Pelican Mapping 00004 * http://osgearth.org 00005 * 00006 * osgEarth is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU Lesser General Public License as published by 00008 * the Free Software Foundation; either version 2 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU Lesser General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU Lesser General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/> 00018 */ 00019 #include <osgEarth/TaskService> 00020 #include <osg/Notify> 00021 00022 using namespace osgEarth; 00023 using namespace OpenThreads; 00024 00025 #define LC "[TaskService] " 00026 00027 //------------------------------------------------------------------------ 00028 00029 TaskRequest::TaskRequest( float priority ) : 00030 osg::Referenced( true ), 00031 _priority( priority ), 00032 _state( STATE_IDLE ) 00033 { 00034 _progress = new ProgressCallback(); 00035 } 00036 00037 void 00038 TaskRequest::run() 00039 { 00040 if ( _state == STATE_IN_PROGRESS ) 00041 { 00042 _startTime = osg::Timer::instance()->tick(); 00043 (*this)( _progress.get() ); 00044 _endTime = osg::Timer::instance()->tick(); 00045 } 00046 else 00047 { 00048 _progress->cancel(); 00049 } 00050 } 00051 00052 void 00053 TaskRequest::cancel() 00054 { 00055 //OE_INFO << LC << "TR [" << getName() << "] canceled" << std::endl; 00056 _progress->cancel(); 00057 } 00058 00059 bool 00060 TaskRequest::wasCanceled() const 00061 { 00062 return _progress->isCanceled(); 00063 } 00064 00065 //------------------------------------------------------------------------ 00066 00067 TaskRequestQueue::TaskRequestQueue() : 00068 osg::Referenced( true ), 00069 _done( false ) 00070 { 00071 } 00072 00073 void 00074 TaskRequestQueue::clear() 00075 { 00076 ScopedLock<Mutex> lock(_mutex); 00077 _requests.clear(); 00078 } 00079 00080 unsigned int 00081 TaskRequestQueue::getNumRequests() const 00082 { 00083 ScopedLock<Mutex> lock(const_cast<TaskRequestQueue*>(this)->_mutex); 00084 return _requests.size(); 00085 } 00086 00087 void 00088 TaskRequestQueue::add( TaskRequest* request ) 00089 { 00090 request->setState( TaskRequest::STATE_PENDING ); 00091 00092 // install a progress callback if one isn't already installed 00093 if ( !request->getProgressCallback() ) 00094 request->setProgressCallback( new ProgressCallback() ); 00095 00096 ScopedLock<Mutex> lock(_mutex); 00097 00098 // insert by priority. 00099 _requests.insert( std::pair<float,TaskRequest*>(request->getPriority(), request) ); 00100 00101 #if 0 00102 // insert by priority. 00103 bool inserted = false; 00104 for( TaskRequestList::iterator i = _requests.begin(); i != _requests.end(); i++ ) 00105 { 00106 if ( request->getPriority() > i->get()->getPriority() ) 00107 { 00108 _requests.insert( i, request ); 00109 inserted = true; 00110 //OE_NOTICE << "TaskRequestQueue size=" << _requests.size() << std::endl; 00111 break; 00112 } 00113 } 00114 00115 if ( !inserted ) 00116 _requests.push_back( request ); 00117 #endif 00118 00119 // since there is data in the queue, wake up one waiting task thread. 00120 _cond.signal(); 00121 } 00122 00123 TaskRequest* 00124 TaskRequestQueue::get() 00125 { 00126 ScopedLock<Mutex> lock(_mutex); 00127 00128 while ( !_done && _requests.empty() ) 00129 { 00130 // releases the mutex and waits on the condition. 00131 _cond.wait( &_mutex ); 00132 } 00133 00134 if ( _done ) 00135 { 00136 return 0L; 00137 } 00138 00139 osg::ref_ptr<TaskRequest> next = _requests.begin()->second.get(); //_requests.front(); 00140 _requests.erase( _requests.begin() ); //_requests.pop_front(); 00141 00142 // I'm done, someone else take a turn: 00143 // (technically this shouldn't be necessary since add() bumps the semaphore once 00144 // for each request in the queue) 00145 _cond.signal(); 00146 00147 return next.release(); 00148 } 00149 00150 void 00151 TaskRequestQueue::setDone() 00152 { 00153 // we need to obtain the mutex since we're using the Condition 00154 ScopedLock<Mutex> lock(_mutex); 00155 00156 _done = true; 00157 00158 // wake everyone up so they can see the _done flag set and exit. 00159 //_cond.broadcast(); 00160 00161 // alternative to buggy win32 broadcast (OSG pre-r10457 on windows) 00162 for(int i=0; i<128; i++) 00163 _cond.signal(); 00164 } 00165 00166 //------------------------------------------------------------------------ 00167 00168 TaskThread::TaskThread( TaskRequestQueue* queue ) : 00169 _queue( queue ), 00170 _done( false ) 00171 { 00172 //nop 00173 } 00174 00175 void 00176 TaskThread::run() 00177 { 00178 while( !_done ) 00179 { 00180 _request = _queue->get(); 00181 00182 if ( _done ) 00183 break; 00184 00185 if (_request.valid()) 00186 { 00187 // discard a completed or canceled request: 00188 if ( _request->getState() != TaskRequest::STATE_PENDING ) 00189 { 00190 _request->cancel(); 00191 } 00192 00193 else if ( !_request->wasCanceled() ) 00194 { 00195 if ( _request->getProgressCallback() ) 00196 _request->getProgressCallback()->onStarted(); 00197 00198 _request->setState( TaskRequest::STATE_IN_PROGRESS ); 00199 _request->run(); 00200 00201 //OE_INFO << LC << "Task \"" << _request->getName() << "\" runtime = " << _request->runTime() << " s." << std::endl; 00202 } 00203 else 00204 { 00205 //OE_INFO << LC << "Task \"" << _request->getName() << "\" was cancelled before it ran." << std::endl; 00206 } 00207 00208 _request->setState( TaskRequest::STATE_COMPLETED ); 00209 00210 // signal the completion of a request. 00211 if ( _request->getProgressCallback() ) 00212 _request->getProgressCallback()->onCompleted(); 00213 00214 // Release the request 00215 _request = 0; 00216 } 00217 } 00218 } 00219 00220 int 00221 TaskThread::cancel() 00222 { 00223 if ( isRunning() ) 00224 { 00225 _done = true; 00226 00227 if (_request.valid()) 00228 { 00229 _request->cancel(); 00230 } 00231 00232 while( isRunning() ) 00233 { 00234 OpenThreads::Thread::YieldCurrentThread(); 00235 } 00236 } 00237 return 0; 00238 } 00239 00240 //------------------------------------------------------------------------ 00241 00242 TaskService::TaskService( const std::string& name, int numThreads ): 00243 osg::Referenced( true ), 00244 _lastRemoveFinishedThreadsStamp(0), 00245 _name(name) 00246 { 00247 _queue = new TaskRequestQueue(); 00248 setNumThreads( numThreads ); 00249 } 00250 00251 unsigned int 00252 TaskService::getNumRequests() const 00253 { 00254 return _queue->getNumRequests(); 00255 } 00256 00257 void 00258 TaskService::add( TaskRequest* request ) 00259 { 00260 //OE_INFO << LC << "TS [" << _name << "] adding request [" << request->getName() << "]" << std::endl; 00261 _queue->add( request ); 00262 } 00263 00264 TaskService::~TaskService() 00265 { 00266 _queue->setDone(); 00267 00268 for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ ) 00269 { 00270 (*i)->setDone(true); 00271 } 00272 00273 for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ ) 00274 { 00275 (*i)->cancel(); 00276 delete (*i); 00277 } 00278 } 00279 00280 int 00281 TaskService::getStamp() const 00282 { 00283 return _queue->getStamp(); 00284 } 00285 00286 void 00287 TaskService::setStamp( int stamp ) 00288 { 00289 _queue->setStamp( stamp ); 00290 //Remove finished threads every 60 frames 00291 if (stamp - _lastRemoveFinishedThreadsStamp > 60) 00292 { 00293 removeFinishedThreads(); 00294 _lastRemoveFinishedThreadsStamp = stamp; 00295 } 00296 } 00297 00298 int 00299 TaskService::getNumThreads() const 00300 { 00301 return _numThreads; 00302 } 00303 00304 void 00305 TaskService::setNumThreads(int numThreads ) 00306 { 00307 if ( _numThreads != numThreads ) 00308 { 00309 _numThreads = osg::maximum(1, numThreads); 00310 adjustThreadCount(); 00311 } 00312 } 00313 00314 void 00315 TaskService::adjustThreadCount() 00316 { 00317 OpenThreads::ScopedLock<OpenThreads::ReentrantMutex> lock(_threadMutex); 00318 removeFinishedThreads(); 00319 int numActiveThreads = 0; 00320 for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ ) 00321 { 00322 if (!(*i)->getDone()) numActiveThreads++; 00323 } 00324 00325 int diff = _numThreads - numActiveThreads; 00326 if (diff > 0) 00327 { 00328 OE_DEBUG << LC << "Adding " << diff << " threads to TaskService " << std::endl; 00329 //We need to add some threads 00330 for (int i = 0; i < diff; ++i) 00331 { 00332 TaskThread* thread = new TaskThread( _queue.get() ); 00333 _threads.push_back( thread ); 00334 thread->start(); 00335 } 00336 } 00337 else if (diff < 0) 00338 { 00339 diff = osg::absolute( diff ); 00340 OE_DEBUG << LC << "Removing " << diff << " threads from TaskService " << std::endl; 00341 int numRemoved = 0; 00342 //We need to remove some threads 00343 for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ ) 00344 { 00345 if (!(*i)->getDone()) 00346 { 00347 (*i)->setDone( true ); 00348 numRemoved++; 00349 if (numRemoved == diff) break; 00350 } 00351 } 00352 } 00353 00354 OE_INFO << LC << "TaskService [" << _name << "] using " << _numThreads << " threads" << std::endl; 00355 } 00356 00357 void 00358 TaskService::removeFinishedThreads() 00359 { 00360 OpenThreads::ScopedLock<OpenThreads::ReentrantMutex> lock(_threadMutex); 00361 unsigned int numRemoved = 0; 00362 for (TaskThreads::iterator i = _threads.begin(); i != _threads.end();) 00363 { 00364 //Erase the threads are not running 00365 if (!(*i)->isRunning()) 00366 { 00367 i = _threads.erase( i ); 00368 numRemoved++; 00369 } 00370 else 00371 { 00372 i++; 00373 } 00374 } 00375 if (numRemoved > 0) 00376 { 00377 OE_DEBUG << LC << "Removed " << numRemoved << " finished threads " << std::endl; 00378 } 00379 } 00380 00381 //------------------------------------------------------------------------ 00382 00383 TaskServiceManager::TaskServiceManager( int numThreads ) : 00384 _numThreads( 0 ), 00385 _targetNumThreads( numThreads ) 00386 { 00387 //nop 00388 } 00389 00390 void 00391 TaskServiceManager::setNumThreads( int numThreads ) 00392 { 00393 reallocate( numThreads ); 00394 } 00395 00396 TaskService* 00397 TaskServiceManager::add( UID uid, float weight ) 00398 { 00399 ScopedLock<Mutex> lock( _taskServiceMgrMutex ); 00400 00401 if ( weight <= 0.0f ) 00402 weight = 0.001; 00403 00404 TaskServiceMap::iterator i = _services.find( uid ); 00405 if ( i != _services.end() ) 00406 { 00407 i->second.second = weight; 00408 reallocate( _targetNumThreads ); 00409 return i->second.first.get(); 00410 } 00411 else 00412 { 00413 TaskService* newService = new TaskService( "", 1 ); 00414 _services[uid] = WeightedTaskService( newService, weight ); 00415 reallocate( _targetNumThreads ); 00416 return newService; 00417 } 00418 } 00419 00420 TaskService* 00421 TaskServiceManager::get( UID uid ) const 00422 { 00423 ScopedLock<Mutex> lock( const_cast<TaskServiceManager*>(this)->_taskServiceMgrMutex ); 00424 TaskServiceMap::const_iterator i = _services.find(uid); 00425 return i != _services.end() ? i->second.first.get() : 0L; 00426 } 00427 00428 TaskService* 00429 TaskServiceManager::getOrAdd( UID uid, float weight ) 00430 { 00431 TaskService* service = get( uid ); 00432 return service ? service : add( uid, weight ); 00433 } 00434 00435 void 00436 TaskServiceManager::remove( TaskService* service ) 00437 { 00438 ScopedLock<Mutex> lock( _taskServiceMgrMutex ); 00439 for( TaskServiceMap::iterator i = _services.begin(); i != _services.end(); ++i ) 00440 { 00441 if ( i->second.first.get() == service ) 00442 { 00443 _services.erase( i ); 00444 reallocate( _targetNumThreads ); 00445 break; 00446 } 00447 } 00448 } 00449 00450 void 00451 TaskServiceManager::remove( UID uid ) 00452 { 00453 ScopedLock<Mutex> lock( _taskServiceMgrMutex ); 00454 _services.erase( uid ); 00455 reallocate( _targetNumThreads ); 00456 } 00457 00458 void 00459 TaskServiceManager::setWeight( TaskService* service, float weight ) 00460 { 00461 ScopedLock<Mutex> lock( _taskServiceMgrMutex ); 00462 00463 if ( weight <= 0.0f ) 00464 weight = 0.001; 00465 00466 if ( !service ) 00467 return; 00468 00469 for( TaskServiceMap::iterator i = _services.begin(); i != _services.end(); ++i ) 00470 { 00471 if ( i->second.first.get() == service ) 00472 { 00473 i->second.second = weight; 00474 reallocate( _targetNumThreads ); 00475 break; 00476 } 00477 } 00478 } 00479 00480 void 00481 TaskServiceManager::reallocate( int numThreads ) 00482 { 00483 // first, total up all the weights. 00484 float totalWeight = 0.0f; 00485 for( TaskServiceMap::const_iterator i = _services.begin(); i != _services.end(); ++i ) 00486 totalWeight += i->second.second; 00487 00488 // next divide the total thread pool size by the relative weight of each service. 00489 _numThreads = 0; 00490 for( TaskServiceMap::const_iterator i = _services.begin(); i != _services.end(); ++i ) 00491 { 00492 int threads = osg::maximum( 1, (int)( (float)_targetNumThreads * (i->second.second / totalWeight) ) ); 00493 i->second.first->setNumThreads( threads ); 00494 _numThreads += threads; 00495 } 00496 }