dune-fem 2.8.0
Loading...
Searching...
No Matches
mpimanager.hh
Go to the documentation of this file.
1#ifndef DUNE_FEM_MPIMANAGER_HH
2#define DUNE_FEM_MPIMANAGER_HH
3
4#if defined _OPENMP || defined(USE_PTHREADS)
5#ifndef USE_SMP_PARALLEL
6#define USE_SMP_PARALLEL
7#endif
8#endif
9
10#include <memory>
11#include <condition_variable>
12#include <thread>
13#include <chrono>
14#include <functional>
15#include <shared_mutex>
16#include <atomic>
17
18#include <dune/common/parallel/mpicommunication.hh>
19#include <dune/common/parallel/mpihelper.hh>
20
21#if HAVE_PETSC
23#endif
24
26
27#ifdef _OPENMP
28#include <omp.h>
29#endif
30
31namespace Dune
32{
33
34 namespace Fem
35 {
42 class SingleThreadModeError : public std::exception
43 {
44 public:
45#ifndef NDEBUG
46 // for performance reasons we only copy messages when in debug mode
47 std::string msg_;
48 void message(const std::string &msg) { msg_ = msg; }
49 const char* what() const noexcept override { return msg_.c_str(); }
50#else
51 void message(const std::string &msg) {}
52 const char* what() const noexcept override
53 {
54 return "SingleThreadModeError: remove -DNDEBUG to obtain a more detailed message!";
55 }
56#endif
57 };
58
59 namespace detail {
63 static inline const unsigned int getEnvNumberThreads (unsigned int defaultValue)
64 {
65#ifdef USE_SMP_PARALLEL
66 unsigned int maxThreads = defaultValue;
67 // use environment variable (for both openmp or pthreads) if set
68 const char* mThreads = std::getenv("DUNE_NUM_THREADS");
69 if( mThreads )
70 maxThreads = std::max( int(1), atoi( mThreads ) );
71 else
72 {
73 const char* mThreads = std::getenv("OMP_NUM_THREADS");
74 if( mThreads )
75 maxThreads = std::max( int(1), atoi( mThreads ) );
76 }
77#else
78 unsigned int maxThreads = 1;
79#endif
80 return maxThreads;
81 }
82
83 class ThreadPool
84 {
85#ifndef _OPENMP
86 static const bool useStdThreads = true ;
87 static_assert( useStdThreads, "useStdThreads is disabled but OpenMP has not been found!");
88#else
89 // default to OMP
90 static const bool useStdThreads = false ;
91#endif
92
93 // maximum number of threads spawned
94 int maxThreads_;
95 // number of threads to be used in next parallel region
96 int numThreads_;
97 int activeThreads_;
98
99 std::vector<std::thread> threads_;
100 std::unordered_map<std::thread::id,int> numbers_; // still used for possible debugging can be removed if thread_local thread number works
101 std::condition_variable_any waitA_;
102 std::shared_mutex lockA_;
103 std::condition_variable_any waitB_;
104 std::shared_mutex lockB_;
105
106 // function to run
107 std::function<void(void)> run_;
108 // stop thread
109 bool finalized_;
110
111#if 1 // this doesn't work as expected
112 // store a static thread local variable for the thread number
113 static int& threadNumber_()
114 {
115 static thread_local int number = -1;
116 return number;
117 }
118#endif
119 // method executed by each thread
120 void wait(int t)
121 {
122 // set thread number (static thread local)
123 ThreadPool::threadNumber_() = t;
124
125 std::shared_lock<std::shared_mutex> lkA(lockA_);
126 std::shared_lock<std::shared_mutex> lkB(lockB_);
127
128 while (!finalized_)
129 {
130 // wait until a new task has been set or until threads are to be finalized
131 // unlock 'A' and wait until master thread either changed run_ // or finalizes
132 // reaquire the (shared) lock after that
133 while (!run_ && !finalized_)
134 waitA_.wait(lkA);
135 // check if to finalize
136 if (finalized_) break;
137 ThreadPool::threadNumber_() = t;
138 numbers_[std::this_thread::get_id()] = t;
139 // run the code is required - note that both shared locks are
140 // held so the main thread has to wait to uniquely acquire
141 // lock 'B' until 'run_' was finished by all threads
142 if (t<numThreads())
143 run_();
144 // this is the same 'waiting' done above but on the 'B' lock. In this case
145 // we wait until 'run_' has been cleared again by the main thread
146 // which can only happen after all threads have enter the
147 // 'wait' which releases the 'B' lock.
148 // This is needed to make sure a thread doesn't execute the same 'run_' twice
149 while (run_)
150 waitB_.wait(lkB);
151 }
152 }
153 template<typename F, typename... Args>
154 void runOpenMP(F&& f, Args&&... args)
155 {
156#ifdef _OPENMP
157 const int nThreads = numThreads();
158 if( nThreads == 1 )
159 {
160 f(args...);
161 return ;
162 }
163
164 std::atomic< bool > singleThreadModeError( false );
165
166 initMultiThreadMode();
167#pragma omp parallel num_threads(nThreads)
168 {
169 // set thread number to thread_local variable
170 threadNumber_() = omp_get_thread_num();
171 // execute given code in parallel
172 try
173 {
174 f(args...);
175 }
176 catch (const Dune::Fem::SingleThreadModeError& e)
177 {
178//#ifndef NDEBUG
179// std::cout << "thread[" << ThreadManager::thread() << "] " << e.what() << std::endl;
180//#endif
181 singleThreadModeError = true ;
182 }
183
184 } // end parallel region
185
186 // enter single thread mode again
187 initSingleThreadMode();
188
189 // only throw one exception to the outside world
190 if( singleThreadModeError )
191 {
192 DUNE_THROW(SingleThreadModeError, "ThreadPool::run: single thread mode violation occurred!");
193 }
194#endif
195 }
196
197 public:
198 ThreadPool()
199 : maxThreads_( std::max(1u, detail::getEnvNumberThreads( std::thread::hardware_concurrency() )) )
200 , numThreads_( detail::getEnvNumberThreads(1) )
201 , activeThreads_(1)
202 , threads_()
203 , run_(nullptr)
204 , finalized_(false)
205 {
206 // spawn max number of threads to use
207 ThreadPool::threadNumber_() = 0;
208#ifdef USE_SMP_PARALLEL
209 if constexpr( useStdThreads )
210 {
211 numbers_[std::this_thread::get_id()] = 0;
212 for (int t=1;t<maxThreads_;++t)
213 {
214 threads_.push_back( std::thread( [this,t]() { wait(t); } ) );
215 numbers_[threads_[t-1].get_id()] = t;
216 }
217 }
218#endif
219 }
220 ~ThreadPool()
221 {
222#ifdef USE_SMP_PARALLEL
223 if constexpr( useStdThreads )
224 {
225 // all threads should be in the 'start' waiting phase - notify of change of 'finalize variable
226 {
227 std::unique_lock<std::shared_mutex> lk(lockA_);
228 finalized_ = true;
229 }
230 waitA_.notify_all();
231 // join all threads
232 std::for_each(threads_.begin(),threads_.end(), std::mem_fn(&std::thread::join));
233 }
234#endif
235 }
236
237 template<typename F, typename... Args>
238 void run(F&& f, Args&&... args)
239 {
240 if (!singleThreadMode())
241 DUNE_THROW(InvalidStateException, "ThreadPool: run is running run");
242 if constexpr(! useStdThreads )
243 {
244 runOpenMP(f, args...);
245 return ;
246 }
247 if ( numThreads_==1 )
248 f(args...);
249 else
250 {
251 // the current 'master' might not be the thread used to setup the thread pool
252 numbers_[std::this_thread::get_id()] = 0;
253 // see explanation in 'wait' function
254 initMultiThreadMode();
255 std::atomic<bool> caughtException(false);
256 {
257 // acquire lock and set 'run_' - can only be done if all
258 // threads are waiting at top of while loop
259 std::lock_guard<std::shared_mutex> lkA(lockA_);
260 run_ = [&]() {
261 try { f(args...); }
262 catch (const SingleThreadModeError& e )
263 { caughtException = true; }
264 };
265 }
266 // notify all threads of new task - those will all acquire the lock (shared)
267 waitA_.notify_all();
268 // execute task on master thread
269 ThreadPool::threadNumber_() = 0;
270 run_(args...);
271 {
272 // try to acquire lock in non shared mode - this is only possible if all threads have
273 // finished the current task and are waiting at bottom of loop
274 std::lock_guard<std::shared_mutex> lkB(lockB_);
275 run_ = nullptr;
276 }
277 // notify all threads that task has been completed
278 // this moves all threads back to beginning of while loop freeing 'A'
279 waitB_.notify_all();
280
281 initSingleThreadMode();
282 if( caughtException )
283 DUNE_THROW(SingleThreadModeError, "ThreadPool::run: single thread mode violation occurred!");
284 }
285 }
286
287 int numThreads() { return numThreads_; }
288 int maxThreads() { return maxThreads_; }
289#if 0
290 int threadNumber()
291 {
292 // if (singleThreadMode())
293 // return 0;
294 int t = ThreadPool::threadNumber_();
295 assert( t>=0 );
296 return t;
297 }
298#else
299 int threadNumber()
300 {
301#ifdef _OPENMP
302 if constexpr(! useStdThreads )
303 return omp_get_thread_num();
304 else
305#endif
306 return numbers_[std::this_thread::get_id()];
307 // the following doens't work with clang since the current
308 // 'master' might not be the thread setting up this class and
309 // this method is also called without calling 'run'
310 // return numbers_.at(std::this_thread::get_id());
311 }
312#endif
313 void initSingleThreadMode() { activeThreads_ = 1; }
314 void initMultiThreadMode() { activeThreads_ = numThreads_; }
315 bool singleThreadMode() { return activeThreads_ == 1; }
316 void setNumThreads( int use )
317 {
318 if ( !singleThreadMode() )
319 DUNE_THROW(SingleThreadModeError, "ThreadPool: number of threads can only be changed in single thread mode!");
320 if ( use > maxThreads_ )
321 {
322 std::cout << "Warning: requesting more threads than available."
323 << " Maximum number of threads was restricted to " << maxThreads_
324 << " at startup. Setting to maximum number instead.\n";
325 use = maxThreads_;
326 // DUNE_THROW(InvalidStateException, "ThreadPool: trying to set number of threads above the fixed maximum number");
327 }
328 numThreads_ = use;
329 }
330 bool isMainThread() { return threadNumber() == 0; }
331 };
332
333 } // end namespace detail
334
335
337 {
338 typedef Dune::CollectiveCommunication< MPIHelper::MPICommunicator >
340 private:
341 static MPIManager &instance ()
342 {
344 }
345
346 static bool mpiFinalized ()
347 {
348 bool finalized = false ;
349#if HAVE_MPI
350 // check that MPI was not already finalized
351 {
352 int wasFinalized = -1;
353 MPI_Finalized( &wasFinalized );
354 finalized = bool( wasFinalized );
355 }
356#endif // #if HAVE_MPI
357 return finalized ;
358 }
359
360 public:
363 {
364 _finalize();
365 }
366
368 {
369 if( ! mpiFinalized() )
370 {
371#if HAVE_PETSC
372 if( petscWasInitializedHere_ )
373 ::Dune::Petsc::finalize();
374#endif
375 // if MPI_Init was called here and finalize has not been
376 // called yet, then this is the place to call it
377 if( wasInitializedHere_ )
378 {
379#if HAVE_MPI
380 MPI_Finalize();
381#endif
382 }
383 }
384 }
385
386 static void finalize()
387 {
388 instance()._finalize();
389 }
390
391 static void initialize ( int &argc, char **&argv );
392
394 {
395 const std::unique_ptr< CollectiveCommunication > &comm = instance().comm_;
396 if( !comm )
397 DUNE_THROW( InvalidStateException, "MPIManager has not been initialized." );
398 return *comm;
399 }
400
401 static int rank ()
402 {
403 return comm().rank();
404 }
405
406 static int size ()
407 {
408 return comm().size();
409 }
410
412 static inline void initSingleThreadMode() { instance().pool_.initSingleThreadMode(); }
413
415 static inline void initMultiThreadMode() { instance().pool_.initMultiThreadMode(); }
416
418 static int maxThreads() { return instance().pool_.maxThreads(); }
419
421 static int numThreads() { return instance().pool_.numThreads(); }
422
424 static int thread() { return instance().pool_.threadNumber(); }
425
427 static bool isMainThread() { return instance().pool_.isMainThread(); }
428
429 [[deprecated("use isMainThread() instead!")]]
430 static bool isMaster() { return isMainThread(); }
431
433 static void setNumThreads( int use ) { instance().pool_.setNumThreads(use); }
434
436 static bool singleThreadMode() { return instance().pool_.singleThreadMode(); }
437
439 template<typename F, typename... Args>
440 static void run(F&& f, Args&&... args) { instance().pool_.run(f,args...); }
441
442 private:
443 MPIHelper *helper_ = nullptr;
444 std::unique_ptr< CollectiveCommunication > comm_;
445 bool wasInitializedHere_ = false ;
446#if HAVE_PETSC
447 bool petscWasInitializedHere_ = false ;
448#endif
449 detail::ThreadPool pool_;
450 };
451
454
455 } // namespace Fem
456
457} // namespace Dune
458
460
461namespace Dune
462{
463 namespace Fem
464 {
465 class QuadratureStorageRegistry;
466
467 inline void MPIManager::initialize ( int &argc, char **&argv )
468 {
469 MPIHelper *&helper = instance().helper_;
470 std::unique_ptr< CollectiveCommunication > &comm = instance().comm_;
471
472 // the following initialization overrides the MPI_Init in dune-common
473 // to avoid a call to MPI_Finalize before all singletons have been deleted
474#if HAVE_MPI
475 int wasInitialized = -1;
476 MPI_Initialized( &wasInitialized );
477 if(!wasInitialized)
478 {
479#ifndef USE_SMP_PARALLEL
480 // standard MPI_Init
481 // call normal MPI_Init here to prevent MPIHelper to interfering
482 // with MPI_Finalize one program exit which would cause failure
483 {
484 int is_initialized = MPI_Init(&argc, &argv);
485 if( is_initialized != MPI_SUCCESS )
486 DUNE_THROW(InvalidStateException,"MPI_Init failed!");
487 }
488#else // threaded init
489 {
490 int provided;
491 // use MPI_Init_thread for hybrid parallel programs
492 int is_initialized = MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
493
494 if( is_initialized != MPI_SUCCESS )
495 DUNE_THROW(InvalidStateException,"MPI_Init_thread failed!");
496
497#if not defined NDEBUG && defined DUNE_DEVEL_MODE
498 // for OpenMPI provided seems to be MPI_THREAD_SINGLE
499 // but the bybrid version still works. On BlueGene systems
500 // the MPI_THREAD_FUNNELED is really needed
501 if( provided != MPI_THREAD_FUNNELED )
502 {
503 if( provided == MPI_THREAD_SINGLE )
504 dwarn << "MPI thread support = single (instead of funneled)!" << std::endl;
505 else
506 dwarn << "WARNING: MPI thread support = " << provided << " != MPI_THREAD_FUNNELED " << MPI_THREAD_FUNNELED << std::endl;
507 }
508#endif // end NDEBUG
509 }
510#endif // end USE_SMP_PARALLEL
511 instance().wasInitializedHere_ = true;
512
513 } // end if(!wasInitialized)
514#endif // end HAVE_MPI
515
516 // if already initialized, do nothing further
517 if( helper && comm )
518 return ;
519
520 // this will just initialize the static variables inside MPIHelper but
521 // not call MPI_Init again
522 helper = &MPIHelper::instance( argc, argv );
523 comm.reset( new CollectiveCommunication( helper->getCommunicator() ) );
524
525#if HAVE_PETSC
526 // initialize PETSc if present
527 // returns true if PETSc was initialized during this call
528 instance().petscWasInitializedHere_ =
529 ::Dune::Petsc::initialize( rank() == 0, argc, argv );
530#endif
531
532 // initialize static variables of QuadratureStorageRegistry
534 }
535 }
536}
537
538#endif // #ifndef DUNE_FEM_MPIMANAGER_HH
STL namespace.
double max(const Dune::Fem::Double &v, const double p)
Definition: double.hh:965
Definition: bindguard.hh:11
MPIManager ThreadPool
Definition: mpimanager.hh:453
static double max(const Double &v, const double p)
Definition: double.hh:398
Exception thrown when a code segment that is supposed to be only accessed in single thread mode is ac...
Definition: mpimanager.hh:43
void message(const std::string &msg)
Definition: mpimanager.hh:48
const char * what() const noexcept override
Definition: mpimanager.hh:49
std::string msg_
Definition: mpimanager.hh:47
Definition: mpimanager.hh:337
static void run(F &&f, Args &&... args)
run functor f with given arguments args in threaded mode
Definition: mpimanager.hh:440
static const CollectiveCommunication & comm()
Definition: mpimanager.hh:393
static int thread()
return thread number
Definition: mpimanager.hh:424
static void finalize()
Definition: mpimanager.hh:386
static bool isMainThread()
return true if the current thread is the main thread (i.e. thread 0)
Definition: mpimanager.hh:427
Dune::CollectiveCommunication< MPIHelper::MPICommunicator > CollectiveCommunication
Definition: mpimanager.hh:339
static int maxThreads()
return maximal number of threads possible in the current run
Definition: mpimanager.hh:418
static bool isMaster()
Definition: mpimanager.hh:430
static bool singleThreadMode()
returns true if program is operating on one thread currently
Definition: mpimanager.hh:436
static void initMultiThreadMode()
initialize multi thread mode (when in single thread mode)
Definition: mpimanager.hh:415
static void setNumThreads(int use)
set number of threads available during next run
Definition: mpimanager.hh:433
static void initSingleThreadMode()
initialize single thread mode (when in multithread mode)
Definition: mpimanager.hh:412
~MPIManager()
destructor calling finalize if this has not been done
Definition: mpimanager.hh:362
static int size()
Definition: mpimanager.hh:406
static int rank()
Definition: mpimanager.hh:401
static void initialize(int &argc, char **&argv)
Definition: mpimanager.hh:467
void _finalize()
Definition: mpimanager.hh:367
static int numThreads()
return number of current threads
Definition: mpimanager.hh:421
static void initialize()
initialize static variables
Definition: registry.hh:64
return singleton instance of given Object type.
Definition: singleton.hh:88