dune-fem 2.8.0
Loading...
Searching...
No Matches
cachedcommmanager.hh
Go to the documentation of this file.
1#ifndef DUNE_FEM_CACHED_COMMUNICATION_MANAGER_HH
2#define DUNE_FEM_CACHED_COMMUNICATION_MANAGER_HH
3
4#include <cassert>
5#include <cstddef>
6
7//- system includes
8#include <iostream>
9#include <map>
10#include <queue>
11#include <memory>
12#include <vector>
13
14//- dune-common includes
15#include <dune/common/math.hh>
16#include <dune/common/timer.hh>
17#include <dune/common/visibility.hh>
18
19//- dune-grid includes
20#include <dune/grid/common/grid.hh>
21#include <dune/grid/common/datahandleif.hh>
22#include <dune/grid/utility/entitycommhelper.hh>
23
24// include alugrid headers to have to communicator class from ALUGrid
25#if HAVE_DUNE_ALUGRID
26#include <dune/alugrid/3d/alu3dinclude.hh>
27#endif
28
29//- dune-fem includes
36
37namespace Dune
38{
39
40 namespace Fem
41 {
42
43 // External Forward Declarations
44 // -----------------------------
45
46 template< class DiscreteFunctionSpace >
48
50
51
52
57// only if ALUGrid found and was build for parallel runs
58// if HAVE_DUNE_ALUGRID is not defined, ALU3DGRID_PARALLEL shouldn't be either
59#if ALU3DGRID_PARALLEL
60
67 template< class BlockMapper >
68 class DependencyCache
69 {
70 public:
73 typedef BlockMapper BlockMapperType;
74
75 protected:
76 // type of communication indices
77 typedef CommunicationIndexMap IndexMapType;
78
79 // type of IndexMapVector
80 typedef std::vector< IndexMapType > IndexMapVectorType;
81
82 // type of set of links
83 typedef std :: set< int > LinkStorageType;
84
85 // ALUGrid send/recv buffers
86 typedef ALU3DSPACE ObjectStream ObjectStreamType;
87
88 // type of communicator
89 typedef ALU3DSPACE MpAccessLocal MPAccessInterfaceType;
90 // type of communication implementation
91 typedef ALU3DSPACE MpAccessMPI MPAccessImplType;
92
94 typedef std :: vector< ObjectStreamType > ObjectStreamVectorType;
95
96 protected:
97 const InterfaceType interface_;
98 const CommunicationDirection dir_;
99
100 LinkStorageType linkStorage_;
101
102 IndexMapVectorType recvIndexMap_;
103 IndexMapVectorType sendIndexMap_;
104
105 // ALUGrid communicator Class
106 std::unique_ptr< MPAccessInterfaceType > mpAccess_;
107
108 // exchange time
109 double exchangeTime_;
110 // setup time
111 double buildTime_;
112
114 int sequence_;
115
116 int nonBlockingObjects_ ;
117
118 protected:
119 template< class Communication, class LinkStorage,
120 class IndexMapVector, InterfaceType CommInterface >
121 class LinkBuilder;
122
124 // begin NonBlockingCommunication
126
127 class NonBlockingCommunication
128 {
129 typedef DependencyCache < BlockMapper > DependencyCacheType;
130
131#if HAVE_DUNE_ALUGRID
132 typedef MPAccessInterfaceType :: NonBlockingExchange NonBlockingExchange;
133
134 template <class DiscreteFunction>
135 class Pack : public NonBlockingExchange :: DataHandleIF
136 {
137 protected:
138 NonBlockingCommunication& commObj_;
139 const DiscreteFunction& discreteFunction_;
140
141 public:
142 Pack( NonBlockingCommunication& commObj, const DiscreteFunction& df )
143 : commObj_( commObj ), discreteFunction_( df )
144 {}
145
146 void pack( const int link, ObjectStreamType& buffer )
147 {
148 commObj_.pack( link, buffer, discreteFunction_ );
149 }
150
151 void unpack( const int link, ObjectStreamType& buffer )
152 {
153 DUNE_THROW(InvalidStateException,"Pack::unpack should not be called!");
154 }
155 };
156
157 template <class DiscreteFunction, class Operation>
158 class Unpack : public NonBlockingExchange :: DataHandleIF
159 {
160 protected:
161 NonBlockingCommunication& commObj_;
162 DiscreteFunction& discreteFunction_;
163
164 // communication operation (usually ADD or COPY)
165 const Operation operation_;
166
167 public:
168 Unpack( NonBlockingCommunication& commObj, DiscreteFunction& df )
169 : commObj_( commObj ), discreteFunction_( df ), operation_()
170 {}
171
172 void pack( const int link, ObjectStreamType& buffer )
173 {
174 DUNE_THROW(InvalidStateException,"Unpack::pack should not be called!");
175 }
176
177 void unpack( const int link, ObjectStreamType& buffer )
178 {
179 commObj_.unpack( link, buffer, discreteFunction_, operation_ );
180 }
181 };
182#else // ALUGRID_HAS_NONBLOCKING_COMM is false
183 typedef int NonBlockingExchange;
184#endif
185
186 // create an unique tag for the communication
187 DUNE_EXPORT static int getMessageTag()
188 {
189 enum { initial = 665 };
190 static int tagCounter = initial ;
191 ++ tagCounter;
192 int messageTag = tagCounter ;
193
194 // avoid overflow
195 if( messageTag < 0 )
196 {
197 messageTag = initial ;
198 tagCounter = initial ;
199 }
200 return messageTag;
201 }
202
203 public:
204 template <class Space>
205 NonBlockingCommunication( const Space& space,
206 DependencyCacheType& dependencyCache )
207 : dependencyCache_( dependencyCache ),
208 nonBlockingExchange_(),
209 buffer_(),
210 exchangeTime_( 0.0 ),
211 mySize_( space.gridPart().comm().size() )
212 {
213 // make sure cache is up2date
214 dependencyCache_.rebuild( space );
215
216 // notify dependency cache of open communication
217 dependencyCache_.attachComm();
218 }
219
220 // copy constructor
221 NonBlockingCommunication( const NonBlockingCommunication& other )
222 : dependencyCache_( other.dependencyCache_ ),
223 nonBlockingExchange_(),
224 buffer_(),
225 exchangeTime_( 0.0 ),
226 mySize_( other.mySize_ )
227 {
228 // notify dependency cache of open communication
229 dependencyCache_.attachComm();
230 }
231
232 ~NonBlockingCommunication()
233 {
234 // if this assertion fails some communication has not been finished
235 assert( ! nonBlockingExchange_ );
236 // notify dependency cache that comm is finished
237 dependencyCache_.detachComm() ;
238 }
239
240 template < class DiscreteFunctionSpace >
241 void send( const PetscDiscreteFunction< DiscreteFunctionSpace >& discreteFunction )
242 {
243 // nothing to do for the PetscDiscreteFunction here
244 }
245
246 template < class DiscreteFunction >
247 void send( const DiscreteFunction& discreteFunction )
248 {
249 // check that object is in non-sent state
250 assert( ! nonBlockingExchange_ );
251
252 // on serial runs: do nothing
253 if( mySize_ <= 1 ) return;
254
255 // take time
256 Dune::Timer sendTimer ;
257
258 // this variable can change during rebuild
259 const int nLinks = dependencyCache_.nlinks();
260
261 // resize buffer vector
262 buffer_.resize( nLinks );
263
264#if HAVE_DUNE_ALUGRID
265 // get non-blocking exchange object from mpAccess including message tag
266 nonBlockingExchange_.reset( dependencyCache_.mpAccess().nonBlockingExchange( getMessageTag() ) );
267
268 // pack data object
269 Pack< DiscreteFunction > packData( *this, discreteFunction );
270
271 // perform send operation including packing of data
272 nonBlockingExchange_->send( buffer_, packData );
273#else
274 // write buffers
275 for( int link = 0; link < nLinks; ++link )
276 pack( link, buffer_[ link ], discreteFunction );
277#endif
278
279 // store time needed for sending
280 exchangeTime_ = sendTimer.elapsed();
281 }
282
284 template < class DiscreteFunctionSpace, class Operation >
285 double receive( PetscDiscreteFunction< DiscreteFunctionSpace >& discreteFunction,
286 const Operation& operation )
287 {
288 // take time
289 Dune::Timer exchTimer;
290
291 // PetscDiscreteFunction has it's own communication
292 discreteFunction.dofVector().communicateNow( operation );
293
294 return exchTimer.elapsed();
295 }
296
298 template < class DiscreteFunction, class Operation >
299 double receive( DiscreteFunction& discreteFunction, const Operation& operation )
300 {
301 // on serial runs: do nothing
302 if( mySize_ <= 1 ) return 0.0;
303
304 // take time
305 Dune::Timer recvTimer ;
306
307#if HAVE_DUNE_ALUGRID
308 // unpack data object
309 Unpack< DiscreteFunction, Operation > unpackData( *this, discreteFunction );
310
311 // receive data and unpack
312 nonBlockingExchange_->receive( unpackData );
313#else
314 // use exchange for older ALUGrid versions (send and receive)
315 buffer_ = dependencyCache_.mpAccess().exchange( buffer_ );
316
317 // this variable can change during rebuild
318 const int nLinks = buffer_.size();
319
320 // read buffers and store to discrete function
321 for( int link = 0; link < nLinks; ++link )
322 unpack( link, buffer_[ link ], discreteFunction, operation );
323#endif
324
325 // store time needed for sending
326 exchangeTime_ += recvTimer.elapsed();
327
328#if HAVE_DUNE_ALUGRID
329 // clear nonBlockingExchange object
330 nonBlockingExchange_.reset();
331#endif
332 return exchangeTime_;
333 }
334
336 template < class DiscreteFunction >
337 double receive( DiscreteFunction& discreteFunction )
338 {
339 // get type of default operation
340 typedef typename DiscreteFunction :: DiscreteFunctionSpaceType
341 :: template CommDataHandle< DiscreteFunction > :: OperationType DefaultOperationType;
342 DefaultOperationType operation;
343 return receive( discreteFunction, operation );
344 }
345
346 protected:
347 template <class DiscreteFunction>
348 void pack( const int link, ObjectStreamType& buffer, const DiscreteFunction& discreteFunction )
349 {
350 // reset buffer counters
351 buffer.clear();
352 // write data of discrete function to message buffer
353 dependencyCache_.writeBuffer( link, buffer, discreteFunction );
354 }
355
356 template <class DiscreteFunction, class Operation>
357 void unpack( const int link, ObjectStreamType& buffer,
358 DiscreteFunction& discreteFunction, const Operation& operation )
359 {
360 // read data of discrete function from message buffer
361 dependencyCache_.readBuffer( link, buffer, discreteFunction, operation );
362 }
363
364 protected:
365 DependencyCacheType& dependencyCache_;
366 std::unique_ptr< NonBlockingExchange > nonBlockingExchange_ ;
367 ObjectStreamVectorType buffer_;
368 double exchangeTime_ ;
369 const int mySize_;
370 };
371
372 public:
373 typedef NonBlockingCommunication NonBlockingCommunicationType;
374
376 template <class Space>
377 NonBlockingCommunicationType nonBlockingCommunication( const Space& space )
378 {
379 // create non-blocking communication object
380 return NonBlockingCommunicationType( space, *this );
381 }
383 // end NonBlockingCommunication
385
387 DependencyCache( const int nProcs, const InterfaceType interface, const CommunicationDirection dir )
388 : interface_( interface ),
389 dir_( dir ),
390 linkStorage_(),
391 recvIndexMap_( nProcs ),
392 sendIndexMap_( nProcs ),
393 mpAccess_(),
394 exchangeTime_( 0.0 ),
395 buildTime_( 0.0 ),
396 sequence_( -1 ),
397 nonBlockingObjects_( 0 )
398 {
399 }
400
401 template <class Communication>
402 void init( const Communication& comm )
403 {
404 if( ! mpAccess_ )
405 {
406 mpAccess_.reset( new MPAccessImplType( comm ) );
407 }
408 }
409
410 // no copying
411 DependencyCache( const DependencyCache & ) = delete;
412
414 InterfaceType communicationInterface() const
415 {
416 return interface_;
417 }
418
420 CommunicationDirection communicationDirection() const
421 {
422 return dir_;
423 }
424
426 double buildTime() const
427 {
428 return buildTime_;
429 }
430
432 double exchangeTime() const
433 {
434 return exchangeTime_;
435 }
436
437 // notify for open non-blocking communications
438 void attachComm()
439 {
440 ++nonBlockingObjects_;
441 }
442
443 // notify for finished non-blocking communication
444 void detachComm()
445 {
446 --nonBlockingObjects_;
447 assert( nonBlockingObjects_ >= 0 );
448 }
449
450 bool noOpenCommunications() const
451 {
452 return true ;
453 }
454
455 protected:
456 // build linkage and index maps
457 template < class Space >
458 inline void buildMaps( const Space& space );
459
460 // check consistency of maps
461 inline void checkConsistency();
462
463 template< class Space, class Comm, class LS, class IMV, InterfaceType CI >
464 inline void buildMaps( const Space& space, LinkBuilder< Comm, LS, IMV, CI > &handle );
465
466 public:
468 inline int dest( const int link ) const
469 {
470 return mpAccess().dest()[ link ];
471 }
472
474 inline int nlinks() const
475 {
476 return mpAccess().nlinks();
477 }
478
482 template <class Space>
483 inline void rebuild( const Space& space )
484 {
485 const auto& comm = space.gridPart().comm();
486 const int spcSequence = space.sequence();
487
488 // only in parallel we have to do something
489 if( comm.size() <= 1 ) return;
490
491 // make sure all non-blocking communications have been finished by now
492 assert( noOpenCommunications() );
493#ifndef NDEBUG
494 // make sure buildMaps is called on every process
495 // otherwise the programs wait here until forever
496 int willRebuild = (sequence_ != spcSequence) ? 1 : 0;
497 const int myRebuild = willRebuild;
498
499 // send willRebuild from rank 0 to all
500 comm.broadcast( &willRebuild, 1 , 0);
501
502 assert( willRebuild == myRebuild );
503#endif
504
505 // check whether grid has changed.
506 if( sequence_ != spcSequence )
507 {
508 // take timer needed for rebuild
509 Dune::Timer buildTime;
510
511 // rebuild maps holding exchange dof information
512 buildMaps( space );
513 // update sequence number
514 sequence_ = spcSequence;
515
516 // store time needed
517 buildTime_ = buildTime.elapsed();
518 }
519 }
520
522 template< class DiscreteFunction, class Operation >
523 inline void exchange( DiscreteFunction &discreteFunction, const Operation& operation );
524
526 template< class DiscreteFunction >
527 inline void writeBuffer( ObjectStreamVectorType &osv, const DiscreteFunction &discreteFunction ) const;
528
530 template< class DiscreteFunctionType, class Operation >
531 inline void readBuffer( ObjectStreamVectorType &osv,
532 DiscreteFunctionType &discreteFunction,
533 const Operation& operation ) const;
534
536 inline MPAccessInterfaceType &mpAccess()
537 {
538 assert( mpAccess_ );
539 return *mpAccess_;
540 }
541
543 inline const MPAccessInterfaceType &mpAccess() const
544 {
545 assert( mpAccess_ );
546 return *mpAccess_;
547 }
548
549 protected:
550 // specialization for PetscDiscreteFunction doing nothing
551 template< class DiscreteFunctionSpace >
552 inline void writeBuffer( const int link,
553 ObjectStreamType &str,
554 const PetscDiscreteFunction< DiscreteFunctionSpace > &discreteFunction ) const
555 {
556 DUNE_THROW(NotImplemented,"writeBuffer not implemented for PetscDiscteteFunction" );
557 }
558
559 // write data of DataImp& vector to object stream
560 // --writeBuffer
561 template< class DiscreteFunction >
562 inline void writeBuffer( const int link,
563 ObjectStreamType &str,
564 const DiscreteFunction &discreteFunction ) const
565 {
566 assert( sequence_ == discreteFunction.space().sequence() );
567 const auto &indexMap = sendIndexMap_[ dest( link ) ];
568 const int size = indexMap.size();
569
570 typedef typename DiscreteFunction :: DofType DofType;
571
572 // reserve write buffer for storage of dofs
573 typename DiscreteFunction::DiscreteFunctionSpaceType::LocalBlockIndices localBlockIndices;
574 str.reserve( size * Hybrid::size( localBlockIndices ) * sizeof( DofType ) );
575 for( int i = 0; i < size; ++i )
576 {
577 const auto &block = discreteFunction.dofVector()[ indexMap[ i ] ];
578 Hybrid::forEach( localBlockIndices, [ &str, &block ] ( auto &&k ) { str.writeUnchecked( block[ k ] ); } );
579 }
580 }
581
582 // read data from object stream to DataImp& data vector
583 // specialization for PetscDiscreteFunction doing nothing
584 template< class DiscreteFunctionSpace, class Operation >
585 inline void readBuffer( const int link,
586 ObjectStreamType &str,
587 PetscDiscreteFunction< DiscreteFunctionSpace > &discreteFunction,
588 const Operation& ) const
589 {
590 DUNE_THROW(NotImplemented,"readBuffer not implemented for PetscDiscteteFunction" );
591 }
592
593 // read data from object stream to DataImp& data vector
594 // --readBuffer
595 template< class DiscreteFunction, class Operation >
596 inline void readBuffer( const int link,
597 ObjectStreamType &str,
598 DiscreteFunction &discreteFunction,
599 const Operation& operation ) const
600 {
601 static_assert( ! std::is_pointer< Operation > :: value,
602 "DependencyCache::readBuffer: Operation needs to be a reference!");
603
604 assert( sequence_ == discreteFunction.space().sequence() );
605 typedef typename DiscreteFunction :: DofType DofType;
606
607 // get index map of rank belonging to link
608 const auto &indexMap = recvIndexMap_[ dest( link ) ];
609
610 const int size = indexMap.size();
611 // make sure that the receive buffer has the correct size
612 typename DiscreteFunction::DiscreteFunctionSpaceType::LocalBlockIndices localBlockIndices;
613 assert( static_cast< std::size_t >( size * Hybrid::size( localBlockIndices ) * sizeof( DofType ) ) <= static_cast< std::size_t >( str.size() ) );
614 for( int i = 0; i < size; ++i )
615 {
616 auto &&block = discreteFunction.dofVector()[ indexMap[ i ] ];
617 Hybrid::forEach( localBlockIndices, [ &str, &operation, &block ] ( auto &&k ) {
618 DofType value;
619#if HAVE_DUNE_ALUGRID
620 str.readUnchecked( value );
621#else // #if HAVE_DUNE_ALUGRID
622 str.read( value );
623#endif // #else // #if HAVE_DUNE_ALUGRID
624 // apply operation, i.e. COPY, ADD, etc.
625 operation( value, block[ k ] );
626 } );
627 }
628 }
629 };
630
631 // --LinkBuilder
632 template< class BlockMapper >
633 template< class Communication, class LinkStorage, class IndexMapVector, InterfaceType CommInterface >
634 class DependencyCache< BlockMapper > :: LinkBuilder
635 : public CommDataHandleIF
636 < LinkBuilder< Communication, LinkStorage, IndexMapVector, CommInterface >,
637 typename BlockMapper :: GlobalKeyType >
638 {
639 public:
640 typedef Communication CommunicationType;
641 typedef BlockMapper BlockMapperType;
642
643 typedef typename BlockMapperType :: GlobalKeyType GlobalKeyType;
644
645 typedef LinkStorage LinkStorageType;
646 typedef IndexMapVector IndexMapVectorType;
647
648 typedef GlobalKeyType DataType;
649
650 protected:
651 const CommunicationType& comm_;
652 const BlockMapperType &blockMapper_;
653
654 const GlobalKeyType myRank_;
655 const GlobalKeyType mySize_;
656
657 LinkStorageType &linkStorage_;
658
659 IndexMapVectorType &sendIndexMap_;
660 IndexMapVectorType &recvIndexMap_;
661
662
663 public:
664 LinkBuilder( const CommunicationType& comm,
665 const BlockMapperType& blockMapper,
666 LinkStorageType &linkStorage,
667 IndexMapVectorType &sendIdxMap,
668 IndexMapVectorType &recvIdxMap )
669 : comm_( comm ),
670 blockMapper_( blockMapper ),
671 myRank_( comm.rank() ),
672 mySize_( comm.size() ),
673 linkStorage_( linkStorage ),
674 sendIndexMap_( sendIdxMap ),
675 recvIndexMap_( recvIdxMap )
676 {}
677
678 protected:
679 void sendBackSendMaps()
680 {
681 // create ALU communicator
682 MPAccessImplType mpAccess( comm_ );
683
684 // build linkage
685 mpAccess.removeLinkage();
686 // insert new linkage
687 mpAccess.insertRequestSymetric( linkStorage_ );
688 // get destination ranks
689 std::vector<int> dest = mpAccess.dest();
690 // get number of links
691 const int nlinks = mpAccess.nlinks();
692
693 // create buffers
694 ObjectStreamVectorType osv( nlinks );
695
697 //
698 // at this point complete send maps exsist on receiving side,
699 // so send them back to sending side
700 //
702
703 // write all send maps to buffer
704 for(int link=0; link<nlinks; ++link)
705 sendIndexMap_[ dest[link] ].writeToBuffer( osv[link] );
706
707 // exchange data
708 osv = mpAccess.exchange( osv );
709
710 // read all send maps from buffer
711 for(int link=0; link<nlinks; ++link)
712 sendIndexMap_[ dest[link] ].readFromBuffer( osv[link] );
713 }
714
715 public:
717 ~LinkBuilder()
718 {
719 sendBackSendMaps();
720 }
721
723 bool contains( int dim, int codim ) const
724 {
725 return blockMapper_.contains( codim );
726 }
727
729 bool fixedSize( int dim, int codim ) const
730 {
731 return false;
732 }
733
735 template< class MessageBuffer, class Entity >
736 void gather( MessageBuffer &buffer, const Entity &entity ) const
737 {
738 // check whether we are a sending entity
739 const auto myPartitionType = entity.partitionType();
740 const bool send = EntityCommHelper< CommInterface > :: send( myPartitionType );
741
742 // if we send data then send rank and dofs
743 if( send )
744 {
745 // send rank for linkage
746 buffer.write( myRank_ );
747
748 const int numDofs = blockMapper_.numEntityDofs( entity );
749
750 typedef std::vector< GlobalKeyType > IndicesType ;
751 IndicesType indices( numDofs );
752
753 // copy all global keys
754 blockMapper_.mapEachEntityDof( entity, AssignFunctor< IndicesType >( indices ) );
755
756 // write global keys to message buffer
757 for( int i = 0; i < numDofs; ++i )
758 buffer.write( indices[ i ] );
759 }
760 }
761
763 template< class MessageBuffer, class Entity >
764 void scatter( MessageBuffer &buffer, const Entity &entity, const size_t dataSize )
765 {
766 // if data size > 0 then other side is sender
767 if( dataSize > 0 )
768 {
769 // read rank of other side
770 GlobalKeyType rank;
771 buffer.read( rank );
772 assert( (rank >= 0) && (rank < mySize_) );
773
774 // check whether we are a sending entity
775 const auto myPartitionType = entity.partitionType();
776 const bool receive = EntityCommHelper< CommInterface > :: receive( myPartitionType );
777
778 // insert rank of link into set of links
779 linkStorage_.insert( rank );
780
781 // read indices from stream
782 typedef std::vector< GlobalKeyType > IndicesType ;
783 IndicesType indices( dataSize - 1 );
784 for(size_t i=0; i<dataSize-1; ++i)
785 buffer.read( indices[i] );
786
787 // if we are a receiving entity
788 if( receive )
789 {
791 //
792 // Problem here: sending and receiving order might differ
793 // Solution: sort all dofs after receiving order and send
794 // senders dofs back at the end
795 //
797
798 // if data has been send and we are receive entity
799 // then insert indices into send map of rank
800 sendIndexMap_[ rank ].insert( indices );
801
802 // build local mapping for receiving of dofs
803 const int numDofs = blockMapper_.numEntityDofs( entity );
804 indices.resize( numDofs );
805
806 // map each entity dof and store in indices
807 blockMapper_.mapEachEntityDof( entity, AssignFunctor< IndicesType >( indices ) );
808
809 // insert receiving dofs
810 recvIndexMap_[ rank ].insert( indices );
811 }
812 }
813 }
814
816 template< class Entity >
817 size_t size( const Entity &entity ) const
818 {
819 const PartitionType myPartitionType = entity.partitionType();
820 const bool send = EntityCommHelper< CommInterface > :: send( myPartitionType );
821 return (send) ? (blockMapper_.numEntityDofs( entity ) + 1) : 0;
822 }
823 };
824
825
826
827 template< class BlockMapper >
828 template< class Space >
829 inline void DependencyCache< BlockMapper > :: buildMaps( const Space& space )
830 {
831 typedef typename Space::GridPartType::CollectiveCommunicationType CommunicationType;
832 if( interface_ == InteriorBorder_All_Interface )
833 {
834 LinkBuilder< CommunicationType, LinkStorageType, IndexMapVectorType,
835 InteriorBorder_All_Interface >
836 handle( space.gridPart().comm(),
837 space.blockMapper(),
838 linkStorage_, sendIndexMap_, recvIndexMap_ );
839 buildMaps( space, handle );
840 }
841 else if( interface_ == InteriorBorder_InteriorBorder_Interface )
842 {
843 LinkBuilder< CommunicationType, LinkStorageType, IndexMapVectorType,
844 InteriorBorder_InteriorBorder_Interface >
845 handle( space.gridPart().comm(),
846 space.blockMapper(),
847 linkStorage_, sendIndexMap_, recvIndexMap_ );
848 buildMaps( space, handle );
849 }
850 else if( interface_ == All_All_Interface )
851 {
852 LinkBuilder< CommunicationType, LinkStorageType, IndexMapVectorType, All_All_Interface >
853 handle( space.gridPart().comm(),
854 space.blockMapper(),
855 linkStorage_, sendIndexMap_, recvIndexMap_ );
856 buildMaps( space, handle );
857 }
858 else
859 DUNE_THROW( NotImplemented, "DependencyCache for the given interface has not been implemented, yet." );
860#ifndef NDEBUG
861 // checks that sizes of index maps are equal on sending and receiving proc
862 checkConsistency();
863#endif
864 }
865
866
867 template< class BlockMapper >
868 template< class Space, class Comm, class LS, class IMV, InterfaceType CI >
869 inline void DependencyCache< BlockMapper >
870 :: buildMaps( const Space& space, LinkBuilder< Comm, LS, IMV, CI > &handle )
871 {
872 linkStorage_.clear();
873 const size_t size = recvIndexMap_.size();
874 for( size_t i = 0; i < size; ++i )
875 {
876 recvIndexMap_[ i ].clear();
877 sendIndexMap_[ i ].clear();
878 }
879
880 // make one all to all communication to build up communication pattern
881 space.gridPart().communicate( handle, All_All_Interface , ForwardCommunication );
882
883 // remove old linkage
884 mpAccess().removeLinkage();
885 // create new linkage
886 mpAccess().insertRequestSymetric( linkStorage_ );
887 }
888
889 template< class BlockMapper >
890 inline void DependencyCache< BlockMapper > :: checkConsistency()
891 {
892 const int nLinks = nlinks();
893
894 ObjectStreamVectorType buffer( nLinks );
895
896 // check that order and size are consistent
897 for(int l=0; l<nLinks; ++l)
898 {
899 buffer[l].clear();
900 const int sendSize = sendIndexMap_[ dest( l ) ].size();
901 buffer[l].write( sendSize );
902 for(int i=0; i<sendSize; ++i)
903 buffer[l].write( i );
904 }
905
906 // exchange data to other procs
907 buffer = mpAccess().exchange( buffer );
908
909 // check that order and size are consistent
910 for(int l=0; l<nLinks; ++l)
911 {
912 const int recvSize = recvIndexMap_[ dest( l ) ].size();
913 int sendedSize;
914 buffer[l].read( sendedSize );
915
916 // compare sizes, must be the same
917 if( recvSize != sendedSize )
918 {
919 DUNE_THROW(InvalidStateException,"Sizes do not match!" << sendedSize << " o|r " << recvSize);
920 }
921
922 for(int i=0; i<recvSize; ++i)
923 {
924 int idx;
925 buffer[l].read( idx );
926
927 // ordering should be the same on both sides
928 if( i != idx )
929 {
930 DUNE_THROW(InvalidStateException,"Wrong ordering of send and recv maps!");
931 }
932 }
933 }
934 }
935
936 template< class BlockMapper >
937 template< class DiscreteFunction, class Operation >
938 inline void DependencyCache< BlockMapper >
939 :: exchange( DiscreteFunction &discreteFunction, const Operation& operation )
940 {
941 const auto& space = discreteFunction.space();
942
943 // on serial runs: do nothing
944 if( space.gridPart().comm().size() <= 1 ) return;
945
946 // create non-blocking communication object
947 NonBlockingCommunicationType nbc( space, *this );
948
949 // perform send operation
950 nbc.send( discreteFunction );
951
952 // store time for send and receive of data
953 exchangeTime_ = nbc.receive( discreteFunction, operation );
954 }
955
956 template< class BlockMapper >
957 template< class DiscreteFunction >
958 inline void DependencyCache< BlockMapper >
959 :: writeBuffer( ObjectStreamVectorType &osv,
960 const DiscreteFunction &discreteFunction ) const
961 {
962 const int numLinks = nlinks();
963 for( int link = 0; link < numLinks; ++link )
964 writeBuffer( link, osv[ link ], discreteFunction );
965 }
966
967 template< class BlockMapper >
968 template< class DiscreteFunction, class Operation >
969 inline void DependencyCache< BlockMapper >
970 :: readBuffer( ObjectStreamVectorType &osv,
971 DiscreteFunction &discreteFunction,
972 const Operation& operation ) const
973 {
974 const int numLinks = nlinks();
975 for( int link = 0; link < numLinks; ++link )
976 readBuffer( link, osv[ link ], discreteFunction, operation );
977 }
978
980 template < class BlockMapper >
981 class CommManagerSingletonKey
982 {
983 const BlockMapper& blockMapper_;
984 const InterfaceType interface_;
985 const CommunicationDirection dir_;
986 const int pSize_;
987 public:
989 CommManagerSingletonKey(const int pSize,
990 const BlockMapper& blockMapper,
991 const InterfaceType interface,
992 const CommunicationDirection dir)
993 : blockMapper_( blockMapper ),
994 interface_(interface), dir_(dir), pSize_( pSize )
995 {}
996
998 CommManagerSingletonKey(const CommManagerSingletonKey & org) = default;
999
1001 bool operator == (const CommManagerSingletonKey & otherKey) const
1002 {
1003 // block mapper of space is either singleton or the pointers differ anyway
1004 return (&(blockMapper_) == &(otherKey.blockMapper_) );
1005 }
1006
1008 InterfaceType interface() const
1009 {
1010 return interface_;
1011 }
1012
1014 CommunicationDirection direction() const
1015 {
1016 return dir_;
1017 }
1018
1020 int pSize () const { return pSize_; }
1021 };
1022
1025 template <class KeyImp, class ObjectImp>
1026 class CommManagerFactory
1027 {
1028 public:
1030 static ObjectImp * createObject( const KeyImp & key )
1031 {
1032 return new ObjectImp(key.pSize(), key.interface(), key.direction());
1033 }
1034
1036 static void deleteObject( ObjectImp * obj )
1037 {
1038 delete obj;
1039 }
1040 };
1041
1043 template <class SpaceImp>
1044 class CommunicationManager
1045 {
1046 typedef CommunicationManager<SpaceImp> ThisType;
1047
1048 typedef typename SpaceImp::BlockMapperType BlockMapperType;
1049
1050 // type of communication manager object which does communication
1051 typedef DependencyCache< BlockMapperType > DependencyCacheType;
1052
1053 typedef CommManagerSingletonKey< BlockMapperType > KeyType;
1054 typedef CommManagerFactory<KeyType, DependencyCacheType> FactoryType;
1055
1056 typedef SingletonList< KeyType , DependencyCacheType , FactoryType > CommunicationProviderType;
1057
1058 typedef SpaceImp SpaceType;
1059 const SpaceType& space_;
1060
1061 typedef ALU3DSPACE MpAccessLocal MPAccessInterfaceType;
1062
1063 // is singleton per block mapper (spaces can differ)
1064 std::unique_ptr< DependencyCacheType, typename CommunicationProviderType::Deleter > cache_;
1065
1066 // copy constructor
1067 CommunicationManager(const ThisType& org) = delete;
1068 public:
1069 // type of non-blocking communication object
1070 typedef typename DependencyCacheType :: NonBlockingCommunicationType NonBlockingCommunicationType;
1071
1073 CommunicationManager(const SpaceType& space,
1074 const InterfaceType interface,
1075 const CommunicationDirection dir)
1076 : space_( space ) // my space which should have a longer life time than
1077 // this communicator since the communicator is created inside the space
1078 , cache_( &CommunicationProviderType::getObject(
1079 KeyType( space.gridPart().comm().size(), space_.blockMapper(), interface,dir) ) )
1080 {
1081 // pass communication on to dependency cache
1082 cache().init( space.gridPart().comm() );
1083
1084 //std::cout << "P["<< space.gridPart().comm().rank() <<"] CommunicationManager: created and got cache " << &cache_ << std::endl;
1085 }
1086
1088 CommunicationManager(const SpaceType& space)
1089 : CommunicationManager( space, space.communicationInterface(), space.communicationDirection() )
1090 {}
1091
1092 DependencyCacheType& cache () const { assert( cache_ ); return *cache_; }
1093
1095 InterfaceType communicationInterface() const
1096 {
1097 return cache().communicationInterface();
1098 }
1099
1101 CommunicationDirection communicationDirection() const
1102 {
1103 return cache().communicationDirection();
1104 }
1105
1107 double buildTime() const
1108 {
1109 return cache().buildTime();
1110 }
1111
1113 double exchangeTime() const
1114 {
1115 return cache().exchangeTime();
1116 }
1117
1118 MPAccessInterfaceType& mpAccess()
1119 {
1120 return cache().mpAccess();
1121 }
1122
1125 {
1126 return cache().nonBlockingCommunication( space_ );
1127 }
1128
1131 template <class DiscreteFunctionType>
1132 void exchange(DiscreteFunctionType & df) const
1133 {
1134 // get type of default operation
1135 typedef typename DiscreteFunctionType :: DiscreteFunctionSpaceType
1136 :: template CommDataHandle< DiscreteFunctionType > :: OperationType DefaultOperationType;
1137
1138 // create default operation
1139 DefaultOperationType operation;
1140
1141 exchange( df, operation );
1142 }
1143
1146 template <class DiscreteFunctionType, class Operation>
1147 void exchange(DiscreteFunctionType & df, const Operation& operation ) const
1148 {
1149 cache().exchange( df, operation );
1150 }
1151
1153 template <class ObjectStreamVectorType, class DiscreteFunctionType>
1154 void writeBuffer(ObjectStreamVectorType& osv, const DiscreteFunctionType & df) const
1155 {
1156 cache().writeBuffer( osv, df );
1157 }
1158
1159 // read given df from given buffer
1160 template <class ObjectStreamVectorType, class DiscreteFunctionType>
1161 void readBuffer(ObjectStreamVectorType& osv, DiscreteFunctionType & df) const
1162 {
1163 typedef typename DiscreteFunctionType :: DiscreteFunctionSpaceType
1164 :: template CommDataHandle<DiscreteFunctionType> :: OperationType OperationType;
1165
1166 // communication operation to be performed on the received data
1167 OperationType operation;
1168
1169 readBuffer( osv, df, operation );
1170 }
1171
1172 // read given df from given buffer
1173 template <class ObjectStreamVectorType, class DiscreteFunctionType, class OperationType>
1174 void readBuffer(ObjectStreamVectorType& osv, DiscreteFunctionType & df, const OperationType& operation) const
1175 {
1176 cache().readBuffer( osv, df , operation);
1177 }
1178
1180 void rebuildCache()
1181 {
1182 cache().rebuild( space_ );
1183 }
1184 };
1185
1187 class CommunicationManagerList
1188 {
1190 template <class MPAccessType, class ObjectStreamVectorType>
1191 class DiscreteFunctionCommunicatorInterface
1192 {
1193 protected:
1195 {}
1196 public:
1198 {}
1199
1200 virtual MPAccessType& mpAccess() = 0;
1201 virtual void writeBuffer(ObjectStreamVectorType&) const = 0;
1202 virtual void readBuffer(ObjectStreamVectorType&) = 0;
1203 virtual void rebuildCache() = 0;
1204
1205 virtual bool handles ( IsDiscreteFunction &df ) const = 0;
1206 };
1207
1211 template <class DiscreteFunctionImp,
1212 class MPAccessType,
1213 class ObjectStreamVectorType,
1214 class OperationType >
1215 class DiscreteFunctionCommunicator
1216 : public DiscreteFunctionCommunicatorInterface<MPAccessType,ObjectStreamVectorType>
1217 {
1218 typedef DiscreteFunctionImp DiscreteFunctionType;
1219 typedef typename DiscreteFunctionType :: DiscreteFunctionSpaceType DiscreteFunctionSpaceType;
1220
1221 typedef CommunicationManager<DiscreteFunctionSpaceType> CommunicationManagerType;
1222
1223 // object to communicate
1224 DiscreteFunctionType& df_;
1226 CommunicationManagerType comm_;
1227
1229 const OperationType operation_;
1230 public:
1232 DiscreteFunctionCommunicator(DiscreteFunctionType& df, const OperationType& op )
1233 : df_(df), comm_(df_.space()), operation_( op )
1234 {}
1235
1237 virtual MPAccessType& mpAccess()
1238 {
1239 return comm_.mpAccess();
1240 }
1241
1243 virtual void writeBuffer(ObjectStreamVectorType& osv) const
1244 {
1245 comm_.writeBuffer(osv,df_);
1246 }
1247
1249 virtual void readBuffer(ObjectStreamVectorType& osv)
1250 {
1251 comm_.readBuffer(osv, df_, operation_ );
1252 }
1253
1255 virtual void rebuildCache()
1256 {
1257 comm_.rebuildCache();
1258 }
1259
1260 virtual bool handles ( IsDiscreteFunction &df ) const { return (&static_cast< IsDiscreteFunction & >( df_ ) == &df); }
1261 };
1262
1263 // ALUGrid send/recv buffers
1264 typedef ALU3DSPACE ObjectStream ObjectStreamType;
1265
1266 // type of buffer vector
1267 typedef std::vector< ObjectStreamType > ObjectStreamVectorType;
1268
1269 // type of ALUGrid Communicator
1270 typedef ALU3DSPACE MpAccessLocal MPAccessInterfaceType;
1271
1272 // interface for communicated objects
1273 typedef DiscreteFunctionCommunicatorInterface<MPAccessInterfaceType,ObjectStreamVectorType>
1274 CommObjInterfaceType;
1275
1276 // list of communicated objects
1277 typedef std::list < std::unique_ptr< CommObjInterfaceType > > CommObjListType;
1278 CommObjListType objList_;
1279
1280 // number of processors
1281 int mySize_;
1282
1283 public:
1285 template <class CombinedObjectType>
1286 CommunicationManagerList(CombinedObjectType& cObj) :
1287 mySize_( -1 )
1288 {
1289 // add all discrete functions containd in cObj to list
1290 cObj.addToList(*this);
1291 }
1292
1295 : mySize_( -1 )
1296 {}
1297
1299
1301 template <class DiscreteFunctionImp, class Operation>
1302 void addToList(DiscreteFunctionImp &df, const Operation& operation )
1303 {
1304 // type of communication object
1305 typedef DiscreteFunctionCommunicator<DiscreteFunctionImp,
1306 MPAccessInterfaceType,
1307 ObjectStreamVectorType,
1308 Operation > CommObj;
1309 CommObj * obj = new CommObj(df, operation);
1310 objList_.push_back( std::unique_ptr< CommObjInterfaceType > (obj) );
1311
1312 // if mySize wasn't set, set to number of processors
1313 if( mySize_ < 0 )
1314 {
1315 // get ALUGrid communicator
1316 MPAccessInterfaceType& mpAccess = objList_.front()->mpAccess();
1317
1318 // set number of processors
1319 mySize_ = mpAccess.psize();
1320 }
1321 }
1322
1324 template <class DiscreteFunctionImp>
1325 void addToList(DiscreteFunctionImp &df)
1326 {
1327 DFCommunicationOperation::Copy operation;
1328 addToList( df, operation );
1329 }
1330
1331 template< class DiscreteFunction >
1332 void removeFromList ( DiscreteFunction &df )
1333 {
1334 const auto handles = [ &df ] ( const std::unique_ptr< CommObjInterfaceType > &commObj ) { assert( commObj ); return commObj->handles( df ); };
1335 CommObjListType::reverse_iterator pos = std::find_if( objList_.rbegin(), objList_.rend(), handles );
1336 if( pos != objList_.rend() )
1337 objList_.erase( --pos.base() );
1338 else
1339 DUNE_THROW( RangeError, "Trying to remove discrete function that was never added" );
1340 }
1341
1344 void exchange()
1345 {
1346 // if only one process, do nothing
1347 if( mySize_ <= 1 ) return ;
1348
1349 // exchange data
1350 if(objList_.size() > 0)
1351 {
1352 // rebuild cahce if grid has changed
1353 for(auto& elem : objList_)
1354 elem->rebuildCache();
1355
1356 // get ALUGrid communicator
1357 auto& mpAccess = objList_.front()->mpAccess();
1358
1359 // create buffer
1360 ObjectStreamVectorType osv( mpAccess.nlinks() );
1361
1362 // fill buffers
1363 for(auto& elem : objList_)
1364 elem->writeBuffer(osv);
1365
1366 // exchange data
1367 osv = mpAccess.exchange(osv);
1368
1369 // read buffers
1370 for(auto& elem : objList_)
1371 elem->readBuffer(osv);
1372 }
1373 }
1374 };
1375#endif // #if ALU3DGRID_PARALLEL
1377
1378 } // namespace Fem
1379
1380} // namespace Dune
1381#endif // #ifndef DUNE_FEM_CACHED_COMMUNICATION_MANAGER_HH
const SpaceType & space_
Definition: communicationmanager.hh:165
double buildTime() const
return time needed for last build
Definition: communicationmanager.hh:203
InterfaceType communicationInterface() const
return communication interface
Definition: communicationmanager.hh:189
DefaultCommunicationManager< SpaceImp > ThisType
Definition: communicationmanager.hh:82
SpaceImp SpaceType
Definition: communicationmanager.hh:79
void exchange() const
Definition: communicationmanager.hh:377
void exchange(DiscreteFunction &discreteFunction) const
exchange data for a discrete function using the copy operation
Definition: communicationmanager.hh:225
CommunicationDirection communicationDirection() const
return communication direction
Definition: communicationmanager.hh:194
NonBlockingCommunication NonBlockingCommunicationType
Definition: communicationmanager.hh:173
NonBlockingCommunicationType nonBlockingCommunication() const
return object for non-blocking communication
Definition: communicationmanager.hh:215
bool handles(IsDiscreteFunction &df) const
Definition: communicationmanager.hh:329
double exchangeTime() const
return time needed for last exchange of data
Definition: communicationmanager.hh:209
void addToList(DiscreteFunctionImp &df, const Operation &operation)
add discrete function to communication list
Definition: communicationmanager.hh:349
void removeFromList(DiscreteFunction &df)
Definition: communicationmanager.hh:365
Definition: bindguard.hh:11
base class for determing whether a class is a discrete function or not
Definition: common/discretefunction.hh:53
Definition: cachedcommmanager.hh:47
Definition: commindexmap.hh:16