1#ifndef DUNE_FEM_MPIMANAGER_HH
2#define DUNE_FEM_MPIMANAGER_HH
4#if defined _OPENMP || defined(USE_PTHREADS)
5#ifndef USE_SMP_PARALLEL
6#define USE_SMP_PARALLEL
11#include <condition_variable>
15#include <shared_mutex>
18#include <dune/common/parallel/mpicommunication.hh>
19#include <dune/common/parallel/mpihelper.hh>
49 const char*
what() const noexcept
override {
return msg_.c_str(); }
51 void message(
const std::string &msg) {}
52 const char*
what() const noexcept
override
54 return "SingleThreadModeError: remove -DNDEBUG to obtain a more detailed message!";
63 static inline const unsigned int getEnvNumberThreads (
unsigned int defaultValue)
65#ifdef USE_SMP_PARALLEL
66 unsigned int maxThreads = defaultValue;
68 const char* mThreads = std::getenv(
"DUNE_NUM_THREADS");
70 maxThreads =
std::max(
int(1), atoi( mThreads ) );
73 const char* mThreads = std::getenv(
"OMP_NUM_THREADS");
75 maxThreads =
std::max(
int(1), atoi( mThreads ) );
78 unsigned int maxThreads = 1;
86 static const bool useStdThreads = true ;
87 static_assert( useStdThreads,
"useStdThreads is disabled but OpenMP has not been found!");
90 static const bool useStdThreads = false ;
99 std::vector<std::thread> threads_;
100 std::unordered_map<std::thread::id,int> numbers_;
101 std::condition_variable_any waitA_;
102 std::shared_mutex lockA_;
103 std::condition_variable_any waitB_;
104 std::shared_mutex lockB_;
107 std::function<void(
void)> run_;
113 static int& threadNumber_()
115 static thread_local int number = -1;
123 ThreadPool::threadNumber_() = t;
125 std::shared_lock<std::shared_mutex> lkA(lockA_);
126 std::shared_lock<std::shared_mutex> lkB(lockB_);
133 while (!run_ && !finalized_)
136 if (finalized_)
break;
137 ThreadPool::threadNumber_() = t;
138 numbers_[std::this_thread::get_id()] = t;
153 template<
typename F,
typename... Args>
154 void runOpenMP(F&& f, Args&&... args)
157 const int nThreads = numThreads();
164 std::atomic< bool > singleThreadModeError(
false );
166 initMultiThreadMode();
167#pragma omp parallel num_threads(nThreads)
170 threadNumber_() = omp_get_thread_num();
181 singleThreadModeError = true ;
187 initSingleThreadMode();
190 if( singleThreadModeError )
192 DUNE_THROW(SingleThreadModeError,
"ThreadPool::run: single thread mode violation occurred!");
199 : maxThreads_(
std::
max(1u, detail::getEnvNumberThreads(
std::thread::hardware_concurrency() )) )
200 , numThreads_( detail::getEnvNumberThreads(1) )
207 ThreadPool::threadNumber_() = 0;
208#ifdef USE_SMP_PARALLEL
209 if constexpr( useStdThreads )
211 numbers_[std::this_thread::get_id()] = 0;
212 for (
int t=1;t<maxThreads_;++t)
214 threads_.push_back( std::thread( [
this,t]() { wait(t); } ) );
215 numbers_[threads_[t-1].get_id()] = t;
222#ifdef USE_SMP_PARALLEL
223 if constexpr( useStdThreads )
227 std::unique_lock<std::shared_mutex> lk(lockA_);
232 std::for_each(threads_.begin(),threads_.end(), std::mem_fn(&std::thread::join));
237 template<
typename F,
typename... Args>
238 void run(F&& f, Args&&... args)
240 if (!singleThreadMode())
241 DUNE_THROW(InvalidStateException,
"ThreadPool: run is running run");
242 if constexpr(! useStdThreads )
244 runOpenMP(f, args...);
247 if ( numThreads_==1 )
252 numbers_[std::this_thread::get_id()] = 0;
254 initMultiThreadMode();
255 std::atomic<bool> caughtException(
false);
259 std::lock_guard<std::shared_mutex> lkA(lockA_);
262 catch (
const SingleThreadModeError& e )
263 { caughtException =
true; }
269 ThreadPool::threadNumber_() = 0;
274 std::lock_guard<std::shared_mutex> lkB(lockB_);
281 initSingleThreadMode();
282 if( caughtException )
283 DUNE_THROW(SingleThreadModeError,
"ThreadPool::run: single thread mode violation occurred!");
287 int numThreads() {
return numThreads_; }
288 int maxThreads() {
return maxThreads_; }
294 int t = ThreadPool::threadNumber_();
302 if constexpr(! useStdThreads )
303 return omp_get_thread_num();
306 return numbers_[std::this_thread::get_id()];
313 void initSingleThreadMode() { activeThreads_ = 1; }
314 void initMultiThreadMode() { activeThreads_ = numThreads_; }
315 bool singleThreadMode() {
return activeThreads_ == 1; }
316 void setNumThreads(
int use )
318 if ( !singleThreadMode() )
319 DUNE_THROW(SingleThreadModeError,
"ThreadPool: number of threads can only be changed in single thread mode!");
320 if ( use > maxThreads_ )
322 std::cout <<
"Warning: requesting more threads than available."
323 <<
" Maximum number of threads was restricted to " << maxThreads_
324 <<
" at startup. Setting to maximum number instead.\n";
330 bool isMainThread() {
return threadNumber() == 0; }
338 typedef Dune::CollectiveCommunication< MPIHelper::MPICommunicator >
346 static bool mpiFinalized ()
348 bool finalized = false ;
352 int wasFinalized = -1;
353 MPI_Finalized( &wasFinalized );
354 finalized = bool( wasFinalized );
369 if( ! mpiFinalized() )
372 if( petscWasInitializedHere_ )
373 ::Dune::Petsc::finalize();
377 if( wasInitializedHere_ )
391 static void initialize (
int &argc,
char **&argv );
395 const std::unique_ptr< CollectiveCommunication > &
comm = instance().comm_;
397 DUNE_THROW( InvalidStateException,
"MPIManager has not been initialized." );
403 return comm().rank();
408 return comm().size();
418 static int maxThreads() {
return instance().pool_.maxThreads(); }
421 static int numThreads() {
return instance().pool_.numThreads(); }
424 static int thread() {
return instance().pool_.threadNumber(); }
429 [[deprecated(
"use isMainThread() instead!")]]
433 static void setNumThreads(
int use ) { instance().pool_.setNumThreads(use); }
439 template<
typename F,
typename... Args>
440 static void run(F&& f, Args&&... args) { instance().pool_.run(f,args...); }
443 MPIHelper *helper_ =
nullptr;
444 std::unique_ptr< CollectiveCommunication > comm_;
445 bool wasInitializedHere_ = false ;
447 bool petscWasInitializedHere_ = false ;
449 detail::ThreadPool pool_;
465 class QuadratureStorageRegistry;
469 MPIHelper *&helper = instance().helper_;
470 std::unique_ptr< CollectiveCommunication > &
comm = instance().comm_;
475 int wasInitialized = -1;
476 MPI_Initialized( &wasInitialized );
479#ifndef USE_SMP_PARALLEL
484 int is_initialized = MPI_Init(&argc, &argv);
485 if( is_initialized != MPI_SUCCESS )
486 DUNE_THROW(InvalidStateException,
"MPI_Init failed!");
492 int is_initialized = MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
494 if( is_initialized != MPI_SUCCESS )
495 DUNE_THROW(InvalidStateException,
"MPI_Init_thread failed!");
497#if not defined NDEBUG && defined DUNE_DEVEL_MODE
501 if( provided != MPI_THREAD_FUNNELED )
503 if( provided == MPI_THREAD_SINGLE )
504 dwarn <<
"MPI thread support = single (instead of funneled)!" << std::endl;
506 dwarn <<
"WARNING: MPI thread support = " << provided <<
" != MPI_THREAD_FUNNELED " << MPI_THREAD_FUNNELED << std::endl;
511 instance().wasInitializedHere_ =
true;
522 helper = &MPIHelper::instance( argc, argv );
528 instance().petscWasInitializedHere_ =
529 ::Dune::Petsc::initialize(
rank() == 0, argc, argv );
double max(const Dune::Fem::Double &v, const double p)
Definition: double.hh:965
Definition: bindguard.hh:11
MPIManager ThreadPool
Definition: mpimanager.hh:453
static double max(const Double &v, const double p)
Definition: double.hh:398
Exception thrown when a code segment that is supposed to be only accessed in single thread mode is ac...
Definition: mpimanager.hh:43
void message(const std::string &msg)
Definition: mpimanager.hh:48
const char * what() const noexcept override
Definition: mpimanager.hh:49
std::string msg_
Definition: mpimanager.hh:47
Definition: mpimanager.hh:337
static void run(F &&f, Args &&... args)
run functor f with given arguments args in threaded mode
Definition: mpimanager.hh:440
static const CollectiveCommunication & comm()
Definition: mpimanager.hh:393
static int thread()
return thread number
Definition: mpimanager.hh:424
static void finalize()
Definition: mpimanager.hh:386
static bool isMainThread()
return true if the current thread is the main thread (i.e. thread 0)
Definition: mpimanager.hh:427
Dune::CollectiveCommunication< MPIHelper::MPICommunicator > CollectiveCommunication
Definition: mpimanager.hh:339
static int maxThreads()
return maximal number of threads possible in the current run
Definition: mpimanager.hh:418
static bool isMaster()
Definition: mpimanager.hh:430
static bool singleThreadMode()
returns true if program is operating on one thread currently
Definition: mpimanager.hh:436
static void initMultiThreadMode()
initialize multi thread mode (when in single thread mode)
Definition: mpimanager.hh:415
static void setNumThreads(int use)
set number of threads available during next run
Definition: mpimanager.hh:433
static void initSingleThreadMode()
initialize single thread mode (when in multithread mode)
Definition: mpimanager.hh:412
~MPIManager()
destructor calling finalize if this has not been done
Definition: mpimanager.hh:362
static int size()
Definition: mpimanager.hh:406
static int rank()
Definition: mpimanager.hh:401
static void initialize(int &argc, char **&argv)
Definition: mpimanager.hh:467
void _finalize()
Definition: mpimanager.hh:367
static int numThreads()
return number of current threads
Definition: mpimanager.hh:421
static void initialize()
initialize static variables
Definition: registry.hh:64
return singleton instance of given Object type.
Definition: singleton.hh:88