osgEarth 2.1.1

/home/cube/sources/osgearth/src/osgEarth/TaskService

Go to the documentation of this file.
00001 /* -*-c++-*- */
00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph
00003  * Copyright 2008-2010 Pelican Mapping
00004  * http://osgearth.org
00005  *
00006  * osgEarth is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU Lesser General Public License as published by
00008  * the Free Software Foundation; either version 2 of the License, or
00009  * (at your option) any later version.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU Lesser General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU Lesser General Public License
00017  * along with this program.  If not, see <http://www.gnu.org/licenses/>
00018  */
00019 #ifndef OSGEARTH_TASK_SERVICE
00020 #define OSGEARTH_TASK_SERVICE 1
00021 
00022 #include <osgEarth/Common>
00023 #include <osgEarth/HTTPClient>
00024 #include <osgEarth/ThreadingUtils>
00025 #include <osg/Referenced>
00026 #include <osg/Timer>
00027 #include <queue>
00028 #include <list>
00029 #include <string>
00030 #include <map>
00031 
00032 namespace osgEarth
00033 {
00034     class OSGEARTH_EXPORT TaskRequest : public osg::Referenced
00035     {
00036     public:
00037         enum State {
00038             STATE_IDLE,
00039             STATE_PENDING,
00040             STATE_IN_PROGRESS,
00041             STATE_COMPLETED
00042         };
00043     public:
00044         TaskRequest( float priority =0.0f );
00045 
00046         // the actual task code
00047         virtual void operator()( ProgressCallback* progress ) =0;
00048 
00049         void run();
00050         void cancel();
00051 
00052         bool isIdle() const { return _state == STATE_IDLE; }
00053         bool isPending() const { return _state == STATE_PENDING; }
00054         bool isCompleted() const { return _state == STATE_COMPLETED; }
00055         bool isInProgress() const { return _state == STATE_IN_PROGRESS; }
00056         bool isRunning() const { return isPending() || isInProgress(); }
00057 
00058         bool wasCanceled() const;
00059 
00060         void setPriority( float value ) { _priority = value; }
00061         float getPriority() const { return _priority; }
00062         State getState() const { return _state; }
00063         void setState(State s) { _state = s; }
00064         void setStamp(int stamp) { _stamp = stamp; }
00065         int getStamp() const { return _stamp; }
00066         osg::Referenced* getResult() const { return _result.get(); }
00067         ProgressCallback* getProgressCallback() { return _progress.get(); }
00068         void setProgressCallback(ProgressCallback* progress) { _progress = progress? progress : new ProgressCallback(); }
00069         const std::string& getName() const { return _name; }
00070         void setName( const std::string& name ) { _name = name; }
00071         void reset() { _result = 0L; }
00072         osg::Timer_t startTime() const { return _startTime; }
00073         osg::Timer_t endTime() const { return _endTime; }
00074         double runTime() const { return osg::Timer::instance()->delta_s(_startTime,_endTime); }
00075 
00076         void setCompletedEvent( Threading::Event* value ) { _completedEvent = value; }
00077         Threading::Event* getCompletedEvent() const { return _completedEvent; }
00078 
00079     protected:
00080         float _priority;
00081         volatile State _state;
00082         volatile int _stamp;
00083         osg::ref_ptr<osg::Referenced> _result;
00084         osg::ref_ptr< ProgressCallback > _progress;
00085         std::string _name;
00086         osg::Timer_t _startTime;
00087         osg::Timer_t _endTime;
00088         Threading::Event* _completedEvent;
00089     };
00090 
00091     typedef std::list< osg::ref_ptr<TaskRequest> > TaskRequestList;
00092 
00093     typedef std::vector< osg::ref_ptr<TaskRequest> > TaskRequestVector;
00094 
00095     typedef std::multimap< float, osg::ref_ptr<TaskRequest> > TaskRequestPriorityMap;
00096     
00102     template<typename T>
00103     struct ParallelTask : public TaskRequest, T
00104     {
00105         ParallelTask() : _mev(0L), _sev(0L) { }
00106         ParallelTask( Threading::MultiEvent* ev ) : _mev(ev), _sev(0L) { }
00107         ParallelTask( Threading::Event* ev ) : _sev(ev), _mev(0L) { }
00108 
00109         void operator()( ProgressCallback* pc ) 
00110         {
00111             this->execute();
00112             if ( _mev )
00113                 _mev->notify();
00114             else if ( _sev )
00115                 _sev->set();
00116         }
00117 
00118         Threading::MultiEvent* _mev;
00119         Threading::Event*      _sev;
00120     };
00121 
00122     class TaskRequestQueue : public osg::Referenced
00123     {
00124     public:
00125         TaskRequestQueue();
00126 
00127         void add( TaskRequest* request );
00128         TaskRequest* get();
00129         void clear();
00130 
00131         void setDone();
00132 
00133         void setStamp( int value ) { _stamp = value; }
00134         int getStamp() const { return _stamp; }
00135 
00136         unsigned int getNumRequests() const;
00137 
00138     private:
00139         TaskRequestPriorityMap _requests;
00140         OpenThreads::Mutex _mutex;
00141         OpenThreads::Condition _cond;
00142         volatile bool _done;
00143 
00144         int _stamp;
00145     };
00146     
00147     struct TaskThread : public OpenThreads::Thread
00148     {
00149         TaskThread( TaskRequestQueue* queue );
00150         bool getDone() { return _done;}
00151         void setDone( bool done) { _done = done; }
00152         void run();
00153         int cancel();
00154 
00155     private:
00156         osg::ref_ptr<TaskRequestQueue> _queue;
00157         osg::ref_ptr<TaskRequest> _request;
00158         volatile bool _done;
00159     };
00160 
00164     class OSGEARTH_EXPORT TaskService : public osg::Referenced
00165     {
00166     public:
00167         TaskService( const std::string& name ="", int numThreads =4 );
00168 
00169         void add( TaskRequest* request );
00170 
00171         void setName( const std::string& value ) { _name = value; }
00172         const std::string& getName() const { return _name; }
00173 
00174         int getStamp() const;
00175         void setStamp( int stamp );
00176 
00177         int getNumThreads() const;
00178         void setNumThreads( int numThreads );
00179 
00183         unsigned int getNumRequests() const;
00184 
00185     private:
00186         void adjustThreadCount();
00187         void removeFinishedThreads();
00188 
00189         OpenThreads::ReentrantMutex _threadMutex;
00190         typedef std::list<TaskThread*> TaskThreads;
00191         TaskThreads _threads;
00192         osg::ref_ptr<TaskRequestQueue> _queue;
00193         int _numThreads;
00194         int _lastRemoveFinishedThreadsStamp;
00195         std::string _name;
00196         virtual ~TaskService();
00197     };
00198 
00203     class OSGEARTH_EXPORT TaskServiceManager : public osg::Referenced
00204     {
00205     public:
00210         TaskServiceManager( int numThreads =4 );
00211 
00217         void setNumThreads( int numThreads );
00218 
00223         int getNumThreads() const { return _numThreads; }
00224 
00229         TaskService* add( UID uid, float weight =1.0f );
00230 
00234         TaskService* get( UID uid ) const;
00235 
00239         TaskService* getOrAdd( UID uid, float weight =1.0f );
00240 
00245         void remove( TaskService* service );
00246         void remove( UID uid );
00247 
00252         void setWeight( TaskService* service, float weight );
00253 
00254     private:
00255         typedef std::pair< osg::ref_ptr<TaskService>, float > WeightedTaskService;
00256         typedef std::map< UID, WeightedTaskService > TaskServiceMap;
00257         TaskServiceMap _services;
00258         int _numThreads, _targetNumThreads;
00259         OpenThreads::Mutex _taskServiceMgrMutex;
00260 
00261         void reallocate( int targetNumThreads );
00262     };
00263 }
00264 
00265 #endif // OSGEARTH_TASK_SERVICE
00266 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines