osgEarth 2.1.1
|
00001 /* -*-c++-*- */ 00002 /* osgEarth - Dynamic map generation toolkit for OpenSceneGraph 00003 * Copyright 2008-2010 Pelican Mapping 00004 * http://osgearth.org 00005 * 00006 * osgEarth is free software; you can redistribute it and/or modify 00007 * it under the terms of the GNU Lesser General Public License as published by 00008 * the Free Software Foundation; either version 2 of the License, or 00009 * (at your option) any later version. 00010 * 00011 * This program is distributed in the hope that it will be useful, 00012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 * GNU Lesser General Public License for more details. 00015 * 00016 * You should have received a copy of the GNU Lesser General Public License 00017 * along with this program. If not, see <http://www.gnu.org/licenses/> 00018 */ 00019 #ifndef OSGEARTH_TASK_SERVICE 00020 #define OSGEARTH_TASK_SERVICE 1 00021 00022 #include <osgEarth/Common> 00023 #include <osgEarth/HTTPClient> 00024 #include <osgEarth/ThreadingUtils> 00025 #include <osg/Referenced> 00026 #include <osg/Timer> 00027 #include <queue> 00028 #include <list> 00029 #include <string> 00030 #include <map> 00031 00032 namespace osgEarth 00033 { 00034 class OSGEARTH_EXPORT TaskRequest : public osg::Referenced 00035 { 00036 public: 00037 enum State { 00038 STATE_IDLE, 00039 STATE_PENDING, 00040 STATE_IN_PROGRESS, 00041 STATE_COMPLETED 00042 }; 00043 public: 00044 TaskRequest( float priority =0.0f ); 00045 00046 // the actual task code 00047 virtual void operator()( ProgressCallback* progress ) =0; 00048 00049 void run(); 00050 void cancel(); 00051 00052 bool isIdle() const { return _state == STATE_IDLE; } 00053 bool isPending() const { return _state == STATE_PENDING; } 00054 bool isCompleted() const { return _state == STATE_COMPLETED; } 00055 bool isInProgress() const { return _state == STATE_IN_PROGRESS; } 00056 bool isRunning() const { return isPending() || isInProgress(); } 00057 00058 bool wasCanceled() const; 00059 00060 void setPriority( float value ) { _priority = value; } 00061 float getPriority() const { return _priority; } 00062 State getState() const { return _state; } 00063 void setState(State s) { _state = s; } 00064 void setStamp(int stamp) { _stamp = stamp; } 00065 int getStamp() const { return _stamp; } 00066 osg::Referenced* getResult() const { return _result.get(); } 00067 ProgressCallback* getProgressCallback() { return _progress.get(); } 00068 void setProgressCallback(ProgressCallback* progress) { _progress = progress? progress : new ProgressCallback(); } 00069 const std::string& getName() const { return _name; } 00070 void setName( const std::string& name ) { _name = name; } 00071 void reset() { _result = 0L; } 00072 osg::Timer_t startTime() const { return _startTime; } 00073 osg::Timer_t endTime() const { return _endTime; } 00074 double runTime() const { return osg::Timer::instance()->delta_s(_startTime,_endTime); } 00075 00076 void setCompletedEvent( Threading::Event* value ) { _completedEvent = value; } 00077 Threading::Event* getCompletedEvent() const { return _completedEvent; } 00078 00079 protected: 00080 float _priority; 00081 volatile State _state; 00082 volatile int _stamp; 00083 osg::ref_ptr<osg::Referenced> _result; 00084 osg::ref_ptr< ProgressCallback > _progress; 00085 std::string _name; 00086 osg::Timer_t _startTime; 00087 osg::Timer_t _endTime; 00088 Threading::Event* _completedEvent; 00089 }; 00090 00091 typedef std::list< osg::ref_ptr<TaskRequest> > TaskRequestList; 00092 00093 typedef std::vector< osg::ref_ptr<TaskRequest> > TaskRequestVector; 00094 00095 typedef std::multimap< float, osg::ref_ptr<TaskRequest> > TaskRequestPriorityMap; 00096 00102 template<typename T> 00103 struct ParallelTask : public TaskRequest, T 00104 { 00105 ParallelTask() : _mev(0L), _sev(0L) { } 00106 ParallelTask( Threading::MultiEvent* ev ) : _mev(ev), _sev(0L) { } 00107 ParallelTask( Threading::Event* ev ) : _sev(ev), _mev(0L) { } 00108 00109 void operator()( ProgressCallback* pc ) 00110 { 00111 this->execute(); 00112 if ( _mev ) 00113 _mev->notify(); 00114 else if ( _sev ) 00115 _sev->set(); 00116 } 00117 00118 Threading::MultiEvent* _mev; 00119 Threading::Event* _sev; 00120 }; 00121 00122 class TaskRequestQueue : public osg::Referenced 00123 { 00124 public: 00125 TaskRequestQueue(); 00126 00127 void add( TaskRequest* request ); 00128 TaskRequest* get(); 00129 void clear(); 00130 00131 void setDone(); 00132 00133 void setStamp( int value ) { _stamp = value; } 00134 int getStamp() const { return _stamp; } 00135 00136 unsigned int getNumRequests() const; 00137 00138 private: 00139 TaskRequestPriorityMap _requests; 00140 OpenThreads::Mutex _mutex; 00141 OpenThreads::Condition _cond; 00142 volatile bool _done; 00143 00144 int _stamp; 00145 }; 00146 00147 struct TaskThread : public OpenThreads::Thread 00148 { 00149 TaskThread( TaskRequestQueue* queue ); 00150 bool getDone() { return _done;} 00151 void setDone( bool done) { _done = done; } 00152 void run(); 00153 int cancel(); 00154 00155 private: 00156 osg::ref_ptr<TaskRequestQueue> _queue; 00157 osg::ref_ptr<TaskRequest> _request; 00158 volatile bool _done; 00159 }; 00160 00164 class OSGEARTH_EXPORT TaskService : public osg::Referenced 00165 { 00166 public: 00167 TaskService( const std::string& name ="", int numThreads =4 ); 00168 00169 void add( TaskRequest* request ); 00170 00171 void setName( const std::string& value ) { _name = value; } 00172 const std::string& getName() const { return _name; } 00173 00174 int getStamp() const; 00175 void setStamp( int stamp ); 00176 00177 int getNumThreads() const; 00178 void setNumThreads( int numThreads ); 00179 00183 unsigned int getNumRequests() const; 00184 00185 private: 00186 void adjustThreadCount(); 00187 void removeFinishedThreads(); 00188 00189 OpenThreads::ReentrantMutex _threadMutex; 00190 typedef std::list<TaskThread*> TaskThreads; 00191 TaskThreads _threads; 00192 osg::ref_ptr<TaskRequestQueue> _queue; 00193 int _numThreads; 00194 int _lastRemoveFinishedThreadsStamp; 00195 std::string _name; 00196 virtual ~TaskService(); 00197 }; 00198 00203 class OSGEARTH_EXPORT TaskServiceManager : public osg::Referenced 00204 { 00205 public: 00210 TaskServiceManager( int numThreads =4 ); 00211 00217 void setNumThreads( int numThreads ); 00218 00223 int getNumThreads() const { return _numThreads; } 00224 00229 TaskService* add( UID uid, float weight =1.0f ); 00230 00234 TaskService* get( UID uid ) const; 00235 00239 TaskService* getOrAdd( UID uid, float weight =1.0f ); 00240 00245 void remove( TaskService* service ); 00246 void remove( UID uid ); 00247 00252 void setWeight( TaskService* service, float weight ); 00253 00254 private: 00255 typedef std::pair< osg::ref_ptr<TaskService>, float > WeightedTaskService; 00256 typedef std::map< UID, WeightedTaskService > TaskServiceMap; 00257 TaskServiceMap _services; 00258 int _numThreads, _targetNumThreads; 00259 OpenThreads::Mutex _taskServiceMgrMutex; 00260 00261 void reallocate( int targetNumThreads ); 00262 }; 00263 } 00264 00265 #endif // OSGEARTH_TASK_SERVICE 00266