osgEarth 2.1.1

/home/cube/sources/osgearth/src/osgEarth/TaskService.cpp

Go to the documentation of this file.
00001 /* -*-c++-*- */
00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph
00003  * Copyright 2008-2010 Pelican Mapping
00004  * http://osgearth.org
00005  *
00006  * osgEarth is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU Lesser General Public License as published by
00008  * the Free Software Foundation; either version 2 of the License, or
00009  * (at your option) any later version.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU Lesser General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU Lesser General Public License
00017  * along with this program.  If not, see <http://www.gnu.org/licenses/>
00018  */
00019 #include <osgEarth/TaskService>
00020 #include <osg/Notify>
00021 
00022 using namespace osgEarth;
00023 using namespace OpenThreads;
00024 
00025 #define LC "[TaskService] "
00026 
00027 //------------------------------------------------------------------------
00028 
00029 TaskRequest::TaskRequest( float priority ) :
00030 osg::Referenced( true ),
00031 _priority( priority ),
00032 _state( STATE_IDLE )
00033 {
00034     _progress = new ProgressCallback();
00035 }
00036 
00037 void
00038 TaskRequest::run()
00039 {
00040     if ( _state == STATE_IN_PROGRESS )
00041     {
00042         _startTime = osg::Timer::instance()->tick();
00043         (*this)( _progress.get() );        
00044         _endTime = osg::Timer::instance()->tick();
00045     }
00046     else
00047     {
00048         _progress->cancel();
00049     }
00050 }
00051 
00052 void 
00053 TaskRequest::cancel()
00054 {
00055     //OE_INFO << LC << "TR [" << getName() << "] canceled" << std::endl;
00056     _progress->cancel();
00057 }
00058 
00059 bool
00060 TaskRequest::wasCanceled() const
00061 {
00062     return _progress->isCanceled();
00063 }
00064 
00065 //------------------------------------------------------------------------
00066 
00067 TaskRequestQueue::TaskRequestQueue() :
00068 osg::Referenced( true ),
00069 _done( false )
00070 {
00071 }
00072 
00073 void
00074 TaskRequestQueue::clear()
00075 {
00076     ScopedLock<Mutex> lock(_mutex);
00077     _requests.clear();
00078 }
00079 
00080 unsigned int
00081 TaskRequestQueue::getNumRequests() const
00082 {
00083     ScopedLock<Mutex> lock(const_cast<TaskRequestQueue*>(this)->_mutex);
00084     return _requests.size();
00085 }
00086 
00087 void 
00088 TaskRequestQueue::add( TaskRequest* request )
00089 {
00090     request->setState( TaskRequest::STATE_PENDING );
00091 
00092     // install a progress callback if one isn't already installed
00093     if ( !request->getProgressCallback() )
00094         request->setProgressCallback( new ProgressCallback() );
00095 
00096     ScopedLock<Mutex> lock(_mutex);
00097 
00098     // insert by priority.
00099     _requests.insert( std::pair<float,TaskRequest*>(request->getPriority(), request) );
00100 
00101 #if 0
00102     // insert by priority.
00103     bool inserted = false;
00104     for( TaskRequestList::iterator i = _requests.begin(); i != _requests.end(); i++ )
00105     {
00106         if ( request->getPriority() > i->get()->getPriority() )
00107         {
00108             _requests.insert( i, request );
00109             inserted = true;
00110             //OE_NOTICE << "TaskRequestQueue size=" << _requests.size() << std::endl;
00111             break;
00112         }
00113     }
00114 
00115     if ( !inserted )
00116         _requests.push_back( request );
00117 #endif
00118 
00119     // since there is data in the queue, wake up one waiting task thread.
00120     _cond.signal();
00121 }
00122 
00123 TaskRequest* 
00124 TaskRequestQueue::get()
00125 {
00126     ScopedLock<Mutex> lock(_mutex);
00127 
00128     while ( !_done && _requests.empty() )
00129     {
00130         // releases the mutex and waits on the condition.
00131         _cond.wait( &_mutex );
00132     }
00133 
00134     if ( _done )
00135     {
00136         return 0L;
00137     }
00138 
00139     osg::ref_ptr<TaskRequest> next = _requests.begin()->second.get(); //_requests.front();
00140     _requests.erase( _requests.begin() ); //_requests.pop_front();
00141 
00142     // I'm done, someone else take a turn:
00143     // (technically this shouldn't be necessary since add() bumps the semaphore once
00144     // for each request in the queue)
00145     _cond.signal();
00146 
00147     return next.release();
00148 }
00149 
00150 void
00151 TaskRequestQueue::setDone()
00152 {
00153     // we need to obtain the mutex since we're using the Condition
00154     ScopedLock<Mutex> lock(_mutex);
00155 
00156     _done = true;
00157 
00158     // wake everyone up so they can see the _done flag set and exit.
00159     //_cond.broadcast();
00160 
00161     // alternative to buggy win32 broadcast (OSG pre-r10457 on windows)
00162     for(int i=0; i<128; i++)
00163         _cond.signal();
00164 }
00165 
00166 //------------------------------------------------------------------------
00167 
00168 TaskThread::TaskThread( TaskRequestQueue* queue ) :
00169 _queue( queue ),
00170 _done( false )
00171 {
00172     //nop
00173 }
00174 
00175 void
00176 TaskThread::run()
00177 {
00178     while( !_done )
00179     {
00180         _request = _queue->get();
00181 
00182         if ( _done )
00183             break;
00184 
00185         if (_request.valid())
00186         { 
00187             // discard a completed or canceled request:
00188             if ( _request->getState() != TaskRequest::STATE_PENDING )
00189             {
00190                 _request->cancel();
00191             }
00192 
00193             else if ( !_request->wasCanceled() )
00194             {
00195                 if ( _request->getProgressCallback() )
00196                     _request->getProgressCallback()->onStarted();
00197 
00198                 _request->setState( TaskRequest::STATE_IN_PROGRESS );
00199                 _request->run();
00200 
00201                 //OE_INFO << LC << "Task \"" << _request->getName() << "\" runtime = " << _request->runTime() << " s." << std::endl;
00202             }
00203             else
00204             {
00205                 //OE_INFO << LC << "Task \"" << _request->getName() << "\" was cancelled before it ran." << std::endl;
00206             }
00207             
00208             _request->setState( TaskRequest::STATE_COMPLETED );
00209 
00210             // signal the completion of a request.
00211             if ( _request->getProgressCallback() )
00212                 _request->getProgressCallback()->onCompleted();
00213 
00214             // Release the request
00215             _request = 0;
00216         }
00217     }
00218 }
00219 
00220 int
00221 TaskThread::cancel()
00222 {
00223     if ( isRunning() )
00224     {
00225         _done = true;  
00226 
00227         if (_request.valid())
00228         {
00229             _request->cancel();
00230         }
00231 
00232         while( isRunning() )
00233         {        
00234             OpenThreads::Thread::YieldCurrentThread();
00235         }
00236     }
00237     return 0;
00238 }
00239 
00240 //------------------------------------------------------------------------
00241 
00242 TaskService::TaskService( const std::string& name, int numThreads ):
00243 osg::Referenced( true ),
00244 _lastRemoveFinishedThreadsStamp(0),
00245 _name(name)
00246 {
00247     _queue = new TaskRequestQueue();
00248     setNumThreads( numThreads );
00249 }
00250 
00251 unsigned int
00252 TaskService::getNumRequests() const
00253 {
00254     return _queue->getNumRequests();
00255 }
00256 
00257 void
00258 TaskService::add( TaskRequest* request )
00259 {   
00260     //OE_INFO << LC << "TS [" << _name << "] adding request [" << request->getName() << "]" << std::endl;
00261     _queue->add( request );
00262 }
00263 
00264 TaskService::~TaskService()
00265 {
00266     _queue->setDone();
00267 
00268     for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ )
00269     {
00270         (*i)->setDone(true);
00271     }
00272 
00273     for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ )
00274     {
00275         (*i)->cancel();
00276         delete (*i);
00277     }
00278 }
00279 
00280 int
00281 TaskService::getStamp() const
00282 {
00283     return _queue->getStamp();
00284 }
00285 
00286 void
00287 TaskService::setStamp( int stamp )
00288 {
00289     _queue->setStamp( stamp );
00290     //Remove finished threads every 60 frames
00291     if (stamp - _lastRemoveFinishedThreadsStamp > 60)
00292     {
00293         removeFinishedThreads();
00294         _lastRemoveFinishedThreadsStamp = stamp;
00295     }
00296 }
00297 
00298 int
00299 TaskService::getNumThreads() const
00300 {
00301     return _numThreads;
00302 }
00303 
00304 void
00305 TaskService::setNumThreads(int numThreads )
00306 {
00307     if ( _numThreads != numThreads )
00308     {
00309         _numThreads = osg::maximum(1, numThreads);
00310         adjustThreadCount();
00311     }
00312 }
00313 
00314 void
00315 TaskService::adjustThreadCount()
00316 {
00317     OpenThreads::ScopedLock<OpenThreads::ReentrantMutex> lock(_threadMutex);
00318     removeFinishedThreads();
00319     int numActiveThreads = 0;
00320     for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ )
00321     {
00322         if (!(*i)->getDone()) numActiveThreads++;
00323     }
00324 
00325     int diff = _numThreads - numActiveThreads;
00326     if (diff > 0)
00327     {
00328         OE_DEBUG << LC << "Adding " << diff << " threads to TaskService " << std::endl;
00329         //We need to add some threads
00330         for (int i = 0; i < diff; ++i)
00331         {
00332             TaskThread* thread = new TaskThread( _queue.get() );
00333             _threads.push_back( thread );
00334             thread->start();
00335         }       
00336     }
00337     else if (diff < 0)
00338     {
00339         diff = osg::absolute( diff );
00340         OE_DEBUG << LC << "Removing " << diff << " threads from TaskService " << std::endl;
00341         int numRemoved = 0;
00342         //We need to remove some threads
00343         for( TaskThreads::iterator i = _threads.begin(); i != _threads.end(); i++ )
00344         {
00345             if (!(*i)->getDone())
00346             {
00347                 (*i)->setDone( true );
00348                 numRemoved++;
00349                 if (numRemoved == diff) break;
00350             }
00351         }
00352     }  
00353 
00354     OE_INFO << LC << "TaskService [" << _name << "] using " << _numThreads << " threads" << std::endl;
00355 }
00356 
00357 void
00358 TaskService::removeFinishedThreads()
00359 {
00360     OpenThreads::ScopedLock<OpenThreads::ReentrantMutex> lock(_threadMutex);
00361     unsigned int numRemoved = 0;
00362     for (TaskThreads::iterator i = _threads.begin(); i != _threads.end();)
00363     {
00364         //Erase the threads are not running
00365         if (!(*i)->isRunning())
00366         {
00367             i = _threads.erase( i );
00368             numRemoved++;
00369         }
00370         else
00371         {
00372             i++;
00373         }
00374     }
00375     if (numRemoved > 0)
00376     {
00377         OE_DEBUG << LC << "Removed " << numRemoved << " finished threads " << std::endl;
00378     }
00379 }
00380 
00381 //------------------------------------------------------------------------
00382 
00383 TaskServiceManager::TaskServiceManager( int numThreads ) :
00384 _numThreads( 0 ),
00385 _targetNumThreads( numThreads )
00386 {
00387     //nop
00388 }
00389 
00390 void
00391 TaskServiceManager::setNumThreads( int numThreads )
00392 {
00393     reallocate( numThreads );
00394 }
00395 
00396 TaskService*
00397 TaskServiceManager::add( UID uid, float weight )
00398 {
00399     ScopedLock<Mutex> lock( _taskServiceMgrMutex );
00400 
00401     if ( weight <= 0.0f )
00402         weight = 0.001;
00403 
00404     TaskServiceMap::iterator i = _services.find( uid );
00405     if ( i != _services.end() )
00406     {
00407         i->second.second = weight;
00408         reallocate( _targetNumThreads );
00409         return i->second.first.get();
00410     }
00411     else
00412     {
00413         TaskService* newService = new TaskService( "", 1 );
00414         _services[uid] = WeightedTaskService( newService, weight );
00415         reallocate( _targetNumThreads );
00416         return newService;
00417     }
00418 }
00419 
00420 TaskService*
00421 TaskServiceManager::get( UID uid ) const
00422 {
00423     ScopedLock<Mutex> lock( const_cast<TaskServiceManager*>(this)->_taskServiceMgrMutex );
00424     TaskServiceMap::const_iterator i = _services.find(uid);
00425     return i != _services.end() ? i->second.first.get() : 0L;
00426 }
00427 
00428 TaskService*
00429 TaskServiceManager::getOrAdd( UID uid, float weight ) 
00430 {
00431     TaskService* service = get( uid );
00432     return service ? service : add( uid, weight );
00433 }
00434 
00435 void
00436 TaskServiceManager::remove( TaskService* service )
00437 {
00438     ScopedLock<Mutex> lock( _taskServiceMgrMutex );
00439     for( TaskServiceMap::iterator i = _services.begin(); i != _services.end(); ++i )
00440     {
00441         if ( i->second.first.get() == service ) 
00442         {
00443             _services.erase( i );
00444             reallocate( _targetNumThreads );
00445             break;
00446         }
00447     }
00448 }
00449 
00450 void
00451 TaskServiceManager::remove( UID uid )
00452 {
00453     ScopedLock<Mutex> lock( _taskServiceMgrMutex );
00454     _services.erase( uid );
00455     reallocate( _targetNumThreads );
00456 }
00457 
00458 void
00459 TaskServiceManager::setWeight( TaskService* service, float weight )
00460 {
00461     ScopedLock<Mutex> lock( _taskServiceMgrMutex );
00462 
00463     if ( weight <= 0.0f )
00464         weight = 0.001;
00465 
00466     if ( !service )
00467         return;
00468 
00469     for( TaskServiceMap::iterator i = _services.begin(); i != _services.end(); ++i )
00470     {
00471         if ( i->second.first.get() == service )
00472         {
00473             i->second.second = weight;
00474             reallocate( _targetNumThreads );
00475             break;
00476         }
00477     }    
00478 }
00479 
00480 void
00481 TaskServiceManager::reallocate( int numThreads )
00482 {
00483     // first, total up all the weights.
00484     float totalWeight = 0.0f;
00485     for( TaskServiceMap::const_iterator i = _services.begin(); i != _services.end(); ++i )
00486         totalWeight += i->second.second;
00487 
00488     // next divide the total thread pool size by the relative weight of each service.
00489     _numThreads = 0;
00490     for( TaskServiceMap::const_iterator i = _services.begin(); i != _services.end(); ++i )
00491     {
00492         int threads = osg::maximum( 1, (int)( (float)_targetNumThreads * (i->second.second / totalWeight) ) );
00493         i->second.first->setNumThreads( threads );
00494         _numThreads += threads;
00495     }
00496 }
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines