diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index daa921003b..542085b050 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -33,6 +33,7 @@ add_library(adios2_core helper/adiosNetwork.cpp helper/adiosPartitioner.cpp helper/adiosPluginManager.cpp + helper/adiosRerouting.cpp helper/adiosString.cpp helper/adiosString.tcc helper/adiosSystem.cpp helper/adiosType.cpp @@ -57,6 +58,7 @@ add_library(adios2_core engine/bp5/BP5Writer.tcc engine/bp5/BP5Writer_TwoLevelShm_Async.cpp engine/bp5/BP5Writer_TwoLevelShm.cpp + engine/bp5/BP5Writer_WithRerouting.cpp engine/timeseries/TimeSeriesReader.cpp engine/timeseries/TimeSeriesReader.tcc diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 1f72f10a0a..6d18689081 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -150,7 +150,8 @@ class BP5Engine MACRO(DirectIO, Bool, bool, false) \ MACRO(DirectIOAlignOffset, UInt, unsigned int, 512) \ MACRO(DirectIOAlignBuffer, UInt, unsigned int, 0) \ - MACRO(AggregationType, AggregationType, int, (int)AggregationType::TwoLevelShm) \ + MACRO(AggregationType, AggregationType, int, (int)AggregationType::DataSizeBased) \ + MACRO(EnableWriterRerouting, Bool, bool, true) \ MACRO(AsyncOpen, Bool, bool, true) \ MACRO(AsyncWrite, AsyncWrite, int, (int)AsyncWrite::Sync) \ MACRO(GrowthFactor, Float, float, DefaultBufferGrowthFactor) \ diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index bda55bea49..8ce1ecd4e2 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -115,15 +115,15 @@ helper::RankPartition BP5Writer::GetPartitionInfo(const uint64_t rankDataSize, c m_Profiler.AddTimerWatch("PartitionRanks"); m_Profiler.Start("PartitionRanks"); - helper::Partitioning partitioning = helper::PartitionRanks(allsizes, numPartitions); + m_Partitioning = helper::PartitionRanks(allsizes, numPartitions); m_Profiler.Stop("PartitionRanks"); if (parentRank == 0 && m_Parameters.verbose > 0) { - partitioning.PrintSummary(); + m_Partitioning.PrintSummary(); } - return partitioning.FindPartition(parentRank); + return m_Partitioning.FindPartition(parentRank); } StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds) @@ -390,9 +390,32 @@ void BP5Writer::WriteData(format::BufferV *Data) WriteData_EveryoneWrites(Data, false); break; case (int)AggregationType::EveryoneWritesSerial: - case (int)AggregationType::DataSizeBased: WriteData_EveryoneWrites(Data, true); break; + case (int)AggregationType::DataSizeBased: + // First initialize aggregator and transports if we haven't done it yet this step + if (!m_AggregatorInitializedThisStep) + { + // We can't allow ranks to change subfiles between calls to Put(), so we only + // do this initialization once per timestep. Consequently, partition decision + // could be based on incomplete step data. + InitAggregator(Data->Size()); + InitTransports(); + m_AggregatorInitializedThisStep = true; + } + + // For rerouting to be useful, there must be multiple writers sending + // data to multiple subfiles. + if (m_Parameters.EnableWriterRerouting && m_Comm.Size() > 1 && + m_Aggregator->m_SubStreams > 1) + { + WriteData_WithRerouting(Data); + } + else + { + WriteData_EveryoneWrites(Data, true); + } + break; case (int)AggregationType::TwoLevelShm: WriteData_TwoLevelShm(Data); break; @@ -410,19 +433,6 @@ void BP5Writer::WriteData(format::BufferV *Data) void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, bool SerializedWriters) { - if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased) - { - if (!m_AggregatorInitializedThisStep) - { - // We can't allow ranks to change subfiles between calls to Put(), so we only - // do this initialization once per timestep. Consequently, partition decision - // could be based on incomplete step data. - InitAggregator(Data->Size()); - InitTransports(); - m_AggregatorInitializedThisStep = true; - } - } - const aggregator::MPIChain *a = dynamic_cast(m_Aggregator); // new step writing starts at offset m_DataPos on aggregator @@ -1020,6 +1030,33 @@ void BP5Writer::EndStep() m_Profiler.Stop("ES_AWD"); + if (m_Parameters.verbose > 2) + { + std::cout << "Rank " << m_Comm.Rank() << " deciding whether new writer map is needed" + << std::endl; + std::cout << " m_WriterStep: " << m_WriterStep << std::endl; + std::cout << " m_AppendWriterCount: " << m_AppendWriterCount + << ", m_Comm.Size(): " << m_Comm.Size() << std::endl; + std::cout << " m_AppendAggregatorCount: " << m_AppendAggregatorCount + << ", m_Aggregator->m_NumAggregators: " << m_Aggregator->m_NumAggregators + << std::endl; + std::cout << " m_AppendSubfileCount: " << m_AppendSubfileCount + << ", m_Aggregator->m_SubStreams: " << m_Aggregator->m_SubStreams << std::endl; + } + + if (!m_WriterStep || m_AppendWriterCount != static_cast(m_Comm.Size()) || + m_AppendAggregatorCount != static_cast(m_Aggregator->m_NumAggregators) || + m_AppendSubfileCount != static_cast(m_Aggregator->m_SubStreams)) + { + // new Writer Map is needed + if (m_Parameters.verbose > 2) + { + std::cout << "Rank " << m_Comm.Rank() << " new writer map needed" << std::endl; + } + const uint64_t a = static_cast(m_Aggregator->m_SubStreamIndex); + m_WriterSubfileMap = m_Comm.GatherValues(a, 0); + } + if (m_Parameters.UseSelectiveMetadataAggregation) { SelectiveAggregationMetadata(TSInfo); @@ -1609,29 +1646,9 @@ void BP5Writer::InitMetadataTransports() void BP5Writer::InitTransports() { - std::string cacheKey = GetCacheKey(m_Aggregator); - auto search = m_AggregatorSpecifics.find(cacheKey); - bool cacheHit = false; + OpenSubfile(); - if (search != m_AggregatorSpecifics.end()) - { - if (m_Parameters.verbose > 2) - { - std::cout << "Rank " << m_Comm.Rank() << " cache hit for aggregator key " << cacheKey - << std::endl; - } - cacheHit = true; - } - else - { - // Didn't have one in the cache, add it now - m_AggregatorSpecifics.emplace(std::make_pair(cacheKey, AggTransportData(m_IO, m_Comm))); - } - - AggTransportData &aggData = m_AggregatorSpecifics.at(cacheKey); - - // /path/name.bp.dir/name.bp.rank - aggData.m_SubStreamNames = GetBPSubStreamNames({m_Name}, m_Aggregator->m_SubStreamIndex); + AggTransportData &aggData = m_AggregatorSpecifics.at(GetCacheKey(m_Aggregator)); if (m_IAmDraining) { @@ -1658,9 +1675,49 @@ void BP5Writer::InitTransports() aggData.m_DrainSubStreamNames, m_IO.m_TransportsParameters, m_Parameters.NodeLocal); } + if (m_IAmDraining) + { + if (m_DrainBB) + { + for (const auto &name : aggData.m_DrainSubStreamNames) + { + m_FileDrainer.AddOperationOpen(name, m_OpenMode); + } + } + } + + InitBPBuffer(); +} + +void BP5Writer::OpenSubfile(bool useComm, bool forceAppend) +{ + std::string cacheKey = GetCacheKey(m_Aggregator); + auto search = m_AggregatorSpecifics.find(cacheKey); + bool cacheHit = false; + + if (search != m_AggregatorSpecifics.end()) + { + if (m_Parameters.verbose > 2) + { + std::cout << "Rank " << m_Comm.Rank() << " cache hit for aggregator key " << cacheKey + << std::endl; + } + cacheHit = true; + } + else + { + // Didn't have one in the cache, add it now + m_AggregatorSpecifics.emplace(std::make_pair(cacheKey, AggTransportData(m_IO, m_Comm))); + } + + AggTransportData &aggData = m_AggregatorSpecifics.at(cacheKey); + + // /path/name.bp.dir/name.bp.rank + aggData.m_SubStreamNames = GetBPSubStreamNames({m_Name}, m_Aggregator->m_SubStreamIndex); + helper::Comm openSyncComm; - if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased) + if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased && useComm) { // Split my writer chain so only ranks that actually need to open a // file can do so in an ordered fashion. @@ -1679,7 +1736,7 @@ void BP5Writer::InitTransports() if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased) { - if (m_WriterStep > 0) + if (m_WriterStep > 0 || forceAppend) { // override the mode to be append if we're opening a file that // was already opened by another rank. @@ -1691,29 +1748,24 @@ void BP5Writer::InitTransports() { std::cout << "Rank " << m_Comm.Rank() << " opening data file" << std::endl; } - aggData.m_FileDataManager.OpenFiles(aggData.m_SubStreamNames, mode, - m_IO.m_TransportsParameters, true, - *DataWritingComm); + if (useComm) + { + aggData.m_FileDataManager.OpenFiles(aggData.m_SubStreamNames, mode, + m_IO.m_TransportsParameters, true, + *DataWritingComm); + } + else + { + aggData.m_FileDataManager.OpenFiles(aggData.m_SubStreamNames, mode, + m_IO.m_TransportsParameters, true); + } } } - if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased) + if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased && useComm) { openSyncComm.Free(); } - - if (m_IAmDraining) - { - if (m_DrainBB) - { - for (const auto &name : aggData.m_DrainSubStreamNames) - { - m_FileDrainer.AddOperationOpen(name, m_OpenMode); - } - } - } - - this->InitBPBuffer(); } /*generate the header for the metadata index file*/ @@ -1963,33 +2015,6 @@ void BP5Writer::InitBPBuffer() { m_WriterDataPos.resize(m_Comm.Size()); } - - if (m_Parameters.verbose > 2) - { - std::cout << "Rank " << m_Comm.Rank() << " deciding whether new writer map is needed" - << std::endl; - std::cout << " m_WriterStep: " << m_WriterStep << std::endl; - std::cout << " m_AppendWriterCount: " << m_AppendWriterCount - << ", m_Comm.Size(): " << m_Comm.Size() << std::endl; - std::cout << " m_AppendAggregatorCount: " << m_AppendAggregatorCount - << ", m_Aggregator->m_NumAggregators: " << m_Aggregator->m_NumAggregators - << std::endl; - std::cout << " m_AppendSubfileCount: " << m_AppendSubfileCount - << ", m_Aggregator->m_SubStreams: " << m_Aggregator->m_SubStreams << std::endl; - } - - if (!m_WriterStep || m_AppendWriterCount != static_cast(m_Comm.Size()) || - m_AppendAggregatorCount != static_cast(m_Aggregator->m_NumAggregators) || - m_AppendSubfileCount != static_cast(m_Aggregator->m_SubStreams)) - { - // new Writer Map is needed, generate now, write later - if (m_Parameters.verbose > 2) - { - std::cout << "Rank " << m_Comm.Rank() << " new writer map needed" << std::endl; - } - const uint64_t a = static_cast(m_Aggregator->m_SubStreamIndex); - m_WriterSubfileMap = m_Comm.GatherValues(a, 0); - } } void BP5Writer::EnterComputationBlock() noexcept diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index 4ded24e8d2..594a113561 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -78,6 +78,8 @@ class BP5Writer : public BP5Engine, public core::Engine std::map m_AggregatorSpecifics; helper::RankPartition GetPartitionInfo(const uint64_t rankDataSize, const int subStreams, helper::Comm const &parentComm); + void OpenSubfile(const bool useComm = true, const bool forceAppend = false); + helper::Partitioning m_Partitioning; /** Single object controlling BP buffering */ format::BP5Serializer m_BP5Serializer; @@ -196,6 +198,7 @@ class BP5Writer : public BP5Engine, public core::Engine /** Write Data to disk, in an aggregator chain */ void WriteData(format::BufferV *Data); void WriteData_EveryoneWrites(format::BufferV *Data, bool SerializedWriters); + void WriteData_WithRerouting(format::BufferV *Data); void WriteData_EveryoneWrites_Async(format::BufferV *Data, bool SerializedWriters); void WriteData_TwoLevelShm(format::BufferV *Data); void WriteData_TwoLevelShm_Async(format::BufferV *Data); @@ -302,6 +305,17 @@ class BP5Writer : public BP5Engine, public core::Engine */ uint64_t CountStepsInMetadataIndex(format::BufferSTL &bufferSTL); + // Thread function for inter-rank communication when rerouting aggregation + // is enabled + void ReroutingCommunicationLoop(); + int m_TargetIndex; + int m_TargetCoordinator; + std::mutex m_WriteMutex; + std::mutex m_NotifMutex; + std::condition_variable m_WriteCV; + bool m_ReadyToWrite; + bool m_FinishedWriting; + /* Async write's future */ std::future m_WriteFuture; // variables to delay writing to index file diff --git a/source/adios2/engine/bp5/BP5Writer_WithRerouting.cpp b/source/adios2/engine/bp5/BP5Writer_WithRerouting.cpp new file mode 100644 index 0000000000..f36cf36560 --- /dev/null +++ b/source/adios2/engine/bp5/BP5Writer_WithRerouting.cpp @@ -0,0 +1,777 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * BP5Writer.cpp + * + */ + +#include "BP5Writer.h" + +#include "adios2/common/ADIOSMacros.h" +#include "adios2/core/IO.h" +#include "adios2/helper/adiosFunctions.h" //CheckIndexRange +#include "adios2/helper/adiosRerouting.h" +#include "adios2/toolkit/format/buffer/chunk/ChunkV.h" +#include "adios2/toolkit/format/buffer/malloc/MallocV.h" +#include "adios2/toolkit/transport/file/FileFStream.h" +#include + +#include +#include +#include + +namespace +{ +class BufferPool +{ +public: + BufferPool(int size) { m_Pool.resize(size); } + + ~BufferPool() = default; + + std::vector &GetNextBuffer() + { + size_t bufferIdx = m_CurrentBufferIdx; + + if (m_CurrentBufferIdx < m_Pool.size() - 1) + { + m_CurrentBufferIdx += 1; + } + else + { + m_CurrentBufferIdx = 0; + } + + return m_Pool[bufferIdx]; + } + + size_t m_CurrentBufferIdx = 0; + std::vector> m_Pool; +}; + +struct WriterGroupState +{ + enum class Status + { + UNKNOWN, + WRITING, + IDLE, + PENDING, + CAPACITY, + CLOSED, + }; + + Status m_currentStatus; + int m_queueSize; + size_t m_subFileIndex; + // We could also keep track of number of times rerouted to/from +}; + +bool IsFinished(const WriterGroupState &state) +{ + return state.m_currentStatus == WriterGroupState::Status::CAPACITY || + state.m_currentStatus == WriterGroupState::Status::IDLE; +} + +/// Avoid visiting the same rerouting source or destination groups +/// repeatedly +class StateTraversal +{ +public: + StateTraversal() + { + m_idlerIndex = 0; + m_writerIndex = 0; + } + + enum class SearchResult + { + NOT_FOUND, + FOUND, + FINISHED, + }; + + SearchResult FindNextPair(const std::vector &state, + std::pair &nextPair) + { + auto isIdle = [](WriterGroupState s) { + return s.m_currentStatus == WriterGroupState::Status::IDLE; + }; + + SearchResult idleResult = GetNext(isIdle, state, m_idlerIndex, nextPair.first); + + if (idleResult != SearchResult::FOUND) + { + return idleResult; + } + + auto canOffload = [](WriterGroupState s) { + return s.m_currentStatus == WriterGroupState::Status::WRITING && s.m_queueSize > 0; + }; + + return GetNext(canOffload, state, m_writerIndex, nextPair.second); + } + +private: + SearchResult GetNext(bool (*checkFn)(WriterGroupState), + const std::vector &state, size_t &searchIndex, + size_t &foundIndex) + { + if (state.size() == 0) + { + return SearchResult::NOT_FOUND; + } + + size_t checkedCount = 0; + size_t finishedCount = 0; + + while (checkedCount < state.size()) + { + checkedCount++; + searchIndex++; + + if (searchIndex >= state.size()) + { + searchIndex = 0; + } + + if (checkFn(state[searchIndex])) + { + foundIndex = searchIndex; + return SearchResult::FOUND; + } + else if (IsFinished(state[searchIndex])) + { + finishedCount++; + } + } + + if (finishedCount == state.size()) + { + return SearchResult::FINISHED; + } + + return SearchResult::NOT_FOUND; + } + + size_t m_idlerIndex; + size_t m_writerIndex; +}; +} + +namespace adios2 +{ +namespace core +{ +namespace engine +{ + +using namespace adios2::format; + +void BP5Writer::ReroutingCommunicationLoop() +{ + using RerouteMessage = adios2::helper::RerouteMessage; + + std::cout << "Rank " << m_RankMPI << " Enter ReroutingCommunicationLoop" << std::endl; + + int subCoord = m_Aggregator->m_AggregatorRank; + bool iAmSubCoord = m_RankMPI == subCoord; + + // Arbitrarily decide that the sub coordinator of the first partition is also + // the global coordinator + int globalCoord = m_Partitioning.m_Partitions[0][0]; + bool iAmGlobalCoord = m_RankMPI == globalCoord; + + // Some containers only used by augmented roles: + + // sub coordinators maintain a queue of writers + std::queue writerQueue; + + // global coordinator keeps track of state of each subcoord + std::vector groupState; + StateTraversal pairFinder; + std::vector subCoordRanks; + std::map scRankToIndex; + bool firstIdleMsg = true; + std::set closeAcksNeeded; + std::set groupIdlesNeeded; + bool waitingForCloseAcks = false; + + // Sends are non-blocking. We use the pool to avoid the situation where the + // buffer is destructed before the send is complete. If we start seeing + // many Sends pending for a long time, that could cause us to exhaust the + // pool of buffers, at which point we would start overwriting buffers in the + // pool, potentially creating errors which are difficult to debug/diagnose. + BufferPool sendBuffers(100); + std::vector recvBuffer; + int writingRank = -1; + uint64_t currentFilePos = 0; + bool sentFinished = false; + bool receivedGroupClose = false; + bool expectingWriteCompletion = false; + bool sentIdle = false; + + if (iAmGlobalCoord) + { + groupState.resize(m_CommAggregators.Size()); + subCoordRanks.resize(m_CommAggregators.Size()); + + for (size_t i = 0; i < m_Partitioning.m_Partitions.size(); ++i) + { + // Status remains unknown until we hear something specific (i.e. idle + // message or status inquiry response) + groupState[i].m_currentStatus = WriterGroupState::Status::UNKNOWN; + groupState[i].m_subFileIndex = i; + subCoordRanks[i] = m_Partitioning.m_Partitions[i][0]; + scRankToIndex[m_Partitioning.m_Partitions[i][0]] = i; + closeAcksNeeded.insert(subCoordRanks[i]); + groupIdlesNeeded.insert(subCoordRanks[i]); + } + } + + if (iAmSubCoord) + { + // Pre-populate my queue with the ranks in my group/partition + const std::vector &groupRanks = + m_Partitioning.m_Partitions[m_Aggregator->m_SubStreamIndex]; + for (auto rank : groupRanks) + { + writerQueue.push(static_cast(rank)); + } + + currentFilePos = m_DataPos; + + if (m_DataPosShared) + { + // We have shared data pos after a previous timestep, we should update our + // notion of m_DataPos + currentFilePos = m_SubstreamDataPos[m_Aggregator->m_SubStreamIndex]; + m_DataPosShared = false; + } + } + + auto keepGoing = [&]() { + if (iAmSubCoord) + { + return !receivedGroupClose || expectingWriteCompletion || waitingForCloseAcks; + } + + return !receivedGroupClose; + }; + + while (keepGoing()) + { + int msgReady = 0; + helper::Comm::Status status = + m_Comm.Iprobe(static_cast(helper::Comm::Constants::CommRecvAny), 0, &msgReady); + + // If there is a message ready, receive and handle it + if (msgReady) + { + RerouteMessage message; + message.BlockingRecvFrom(m_Comm, status.Source, recvBuffer); + + switch ((RerouteMessage::MessageType)message.m_MsgType) + { + case RerouteMessage::MessageType::DO_WRITE: + std::cout << "Rank " << m_RankMPI << " received DO_WRITE" << std::endl; + // msg for all processes + { + std::unique_lock lck(m_WriteMutex); + m_TargetIndex = message.m_WildCard; + m_DataPos = message.m_Offset; + // m_TargetCoordinator = message.m_SrcRank; + m_TargetCoordinator = status.Source; + m_ReadyToWrite = true; + m_WriteCV.notify_one(); + } + break; + case RerouteMessage::MessageType::WRITE_COMPLETION: + std::cout << "Rank " << m_RankMPI << " received WRITE_COMPLETION from rank " + << status.Source << std::endl; + // msg for sub coordinator + currentFilePos = message.m_Offset; + writingRank = -1; + expectingWriteCompletion = false; + break; + case RerouteMessage::MessageType::GROUP_CLOSE: + std::cout << "Rank " << m_RankMPI << " received GROUP_CLOSE from rank " + << status.Source << std::endl; + // msg for sub coordinator + receivedGroupClose = true; + + std::cout << "Rank " << m_RankMPI << " sending GROUP_CLOSE_ACK to rank " + << globalCoord << std::endl; + adios2::helper::RerouteMessage closeAckMsg; + closeAckMsg.m_MsgType = RerouteMessage::MessageType::GROUP_CLOSE_ACK; + closeAckMsg.m_SrcRank = m_RankMPI; + closeAckMsg.m_DestRank = globalCoord; + closeAckMsg.m_WildCard = m_Aggregator->m_SubStreamIndex; + closeAckMsg.BlockingSendTo(m_Comm, globalCoord, sendBuffers.GetNextBuffer()); + break; + case RerouteMessage::MessageType::GROUP_CLOSE_ACK: + // msg for global coordinator + { + std::cout << "Rank " << m_RankMPI << " received GROUP_CLOSE_ACK from rank " + << status.Source << std::endl; + + size_t ackGroupIdx = static_cast(message.m_WildCard); + groupState[ackGroupIdx].m_currentStatus = WriterGroupState::Status::CLOSED; + closeAcksNeeded.erase(status.Source); + } + break; + case RerouteMessage::MessageType::WRITER_IDLE: + std::cout << "Rank " << m_RankMPI << " received WRITER_IDLE from rank " + << status.Source << std::endl; + // msg for global coordinator + { + // int idleWriter = message.m_SrcRank; + int idleWriter = status.Source; + size_t idleGroup = static_cast(message.m_WildCard); + groupState[idleGroup].m_currentStatus = WriterGroupState::Status::IDLE; + groupState[idleGroup].m_queueSize = 0; + groupIdlesNeeded.erase(idleWriter); + + if (firstIdleMsg) + { + for (size_t i = 0; i < subCoordRanks.size(); ++i) + { + int scRank = subCoordRanks[i]; + // No need to ask the sender of the WRITER_IDLE msg how big their queue + // is + if (scRank != idleWriter) + { + std::cout << "GC (" << m_RankMPI + << ") sending STATUS_INQUIRY to rank " << scRank + << std::endl; + adios2::helper::RerouteMessage inquiryMsg; + inquiryMsg.m_MsgType = RerouteMessage::MessageType::STATUS_INQUIRY; + inquiryMsg.m_SrcRank = m_RankMPI; + inquiryMsg.m_DestRank = scRank; + inquiryMsg.NonBlockingSendTo(m_Comm, scRank, + sendBuffers.GetNextBuffer()); + } + } + + firstIdleMsg = false; + } + } + break; + case RerouteMessage::MessageType::WRITER_CAPACITY: + std::cout << "Rank " << m_RankMPI << " received WRITER_CAPACITY from rank " + << status.Source << std::endl; + // msg for global coordinator + { + int capacityGroup = message.m_WildCard; + groupState[capacityGroup].m_currentStatus = WriterGroupState::Status::CAPACITY; + } + break; + case RerouteMessage::MessageType::STATUS_INQUIRY: + std::cout << "Rank " << m_RankMPI << " received STATUS_INQUIRY from rank " + << status.Source << std::endl; + // msg for sub coordinator + adios2::helper::RerouteMessage replyMsg; + replyMsg.m_MsgType = RerouteMessage::MessageType::STATUS_REPLY; + replyMsg.m_SrcRank = m_RankMPI; + replyMsg.m_DestRank = globalCoord; + replyMsg.m_WildCard = static_cast(m_Aggregator->m_SubStreamIndex); + replyMsg.m_Size = static_cast(writerQueue.size()); + replyMsg.NonBlockingSendTo(m_Comm, globalCoord, sendBuffers.GetNextBuffer()); + break; + case RerouteMessage::MessageType::STATUS_REPLY: + std::cout << "Rank " << m_RankMPI << " received STATUS_REPLY from rank " + << status.Source << std::endl; + // msg for global coordinator + { + size_t subStreamIdx = static_cast(message.m_WildCard); + int qSize = static_cast(message.m_Size); + groupState[subStreamIdx].m_queueSize = qSize; + groupState[subStreamIdx].m_currentStatus = + qSize > 0 ? WriterGroupState::Status::WRITING + : WriterGroupState::Status::IDLE; + } + break; + case RerouteMessage::MessageType::REROUTE_REQUEST: + std::cout << "Rank " << m_RankMPI << " received REROUTE_REQUEST from rank " + << status.Source << std::endl; + // msg for sub coordinator + if (writerQueue.empty()) + { + std::cout << "Rank " << m_RankMPI << " sending REROUTE_REJECT to rank " + << globalCoord << std::endl; + adios2::helper::RerouteMessage rejectMsg; + rejectMsg.m_MsgType = RerouteMessage::MessageType::REROUTE_REJECT; + rejectMsg.m_SrcRank = message.m_SrcRank; + rejectMsg.m_DestRank = message.m_DestRank; + rejectMsg.NonBlockingSendTo(m_Comm, globalCoord, sendBuffers.GetNextBuffer()); + } + else + { + std::cout << "Rank " << m_RankMPI << " sending REROUTE_ACK to rank " + << globalCoord << std::endl; + int reroutedRank = writerQueue.front(); + writerQueue.pop(); + + adios2::helper::RerouteMessage ackMsg; + ackMsg.m_MsgType = RerouteMessage::MessageType::REROUTE_ACK; + ackMsg.m_SrcRank = message.m_SrcRank; + ackMsg.m_DestRank = message.m_DestRank; + ackMsg.m_WildCard = reroutedRank; + ackMsg.NonBlockingSendTo(m_Comm, globalCoord, sendBuffers.GetNextBuffer()); + } + break; + case RerouteMessage::MessageType::REROUTE_REJECT: + std::cout << "Rank " << m_RankMPI << " received REROUTE_REJECT from rank " + << status.Source << std::endl; + { + // msg for global coordinator + + size_t srcIdx = scRankToIndex[message.m_SrcRank]; + size_t destIdx = scRankToIndex[message.m_DestRank]; + + // Both the src and target subcoord states return from PENDING to their prior + // state + if (groupState[srcIdx].m_currentStatus == WriterGroupState::Status::PENDING) + { + groupState[srcIdx].m_currentStatus = WriterGroupState::Status::WRITING; + groupState[srcIdx].m_queueSize = 0; + } + + if (groupState[destIdx].m_currentStatus == WriterGroupState::Status::PENDING) + { + groupState[destIdx].m_currentStatus = WriterGroupState::Status::IDLE; + } + } + break; + case RerouteMessage::MessageType::REROUTE_ACK: + std::cout << "Rank " << m_RankMPI << " received REROUTE_ACK from rank " + << status.Source << std::endl; + // msg for global coordinator + { + + std::cout << "Rank " << m_RankMPI << " sending WRITE_MORE to rank " + << message.m_DestRank << std::endl; + + // Send the lucky volunteer another writer + adios2::helper::RerouteMessage writeMoreMsg; + writeMoreMsg.m_MsgType = RerouteMessage::MessageType::WRITE_MORE; + writeMoreMsg.m_WildCard = message.m_WildCard; // i.e. the rerouted writer rank + writeMoreMsg.NonBlockingSendTo(m_Comm, message.m_DestRank, + sendBuffers.GetNextBuffer()); + + groupIdlesNeeded.insert(message.m_DestRank); + + // Src subcoord state is returned to writing, dest subcoord state is now writing + // as well + size_t srcIdx = scRankToIndex[message.m_SrcRank]; + size_t destIdx = scRankToIndex[message.m_DestRank]; + groupState[srcIdx].m_currentStatus = WriterGroupState::Status::WRITING; + groupState[srcIdx].m_queueSize -= 1; + groupState[destIdx].m_currentStatus = WriterGroupState::Status::WRITING; + // groupState[destIdx].m_queueSize += 1; + } + break; + case RerouteMessage::MessageType::WRITE_MORE: + std::cout << "Rank " << m_RankMPI << " received WRITE_MORE from rank " + << status.Source << std::endl; + // msg for sub coordinator + writerQueue.push(message.m_WildCard); + sentIdle = false; + break; + default: + break; + } + } + + // All processes + // Check if writing has finished, and alert the target SC + if (!sentFinished) + { + // std::cout << "Rank " << m_RankMPI << " attempting to get lock on m_NotifMutex" << + // std::endl; + std::lock_guard lck(m_NotifMutex); + if (m_FinishedWriting) + { + adios2::helper::RerouteMessage writeCompleteMsg; + writeCompleteMsg.m_MsgType = RerouteMessage::MessageType::WRITE_COMPLETION; + writeCompleteMsg.m_SrcRank = m_RankMPI; + writeCompleteMsg.m_DestRank = m_TargetCoordinator; + writeCompleteMsg.m_WildCard = m_TargetIndex; + writeCompleteMsg.m_Offset = m_DataPos; + + if (!iAmSubCoord && !iAmGlobalCoord) + { + std::cout << "Rank " << m_RankMPI << " notifying SC (" << m_TargetCoordinator + << ") of write completion -- BLOCKING" << std::endl; + // My only role was to write (no communication responsibility) so I am + // done at this point. However, I need to do a blocking send because I + // am about to return from this function, at which point my buffer pool + // goes away. + writeCompleteMsg.BlockingSendTo(m_Comm, m_TargetCoordinator, + sendBuffers.GetNextBuffer()); + + receivedGroupClose = true; + continue; + } + else + { + std::cout << "Rank " << m_RankMPI << " notifying SC (" << m_TargetCoordinator + << ") of write completion -- NONBLOCKING" << std::endl; + writeCompleteMsg.NonBlockingSendTo(m_Comm, m_TargetCoordinator, + sendBuffers.GetNextBuffer()); + } + + sentFinished = true; + } + } + + // Subcoordinator processes + // Check if anyone is writing right now, and if not, ask the next writer to start + if (iAmSubCoord && writingRank == -1) + { + if (!writerQueue.empty()) + { + // Pop the queue and send DO_WRITE + int nextWriter = writerQueue.front(); + std::cout << "Rank " << m_RankMPI << " sending DO_WRITE to " << nextWriter + << std::endl; + writerQueue.pop(); + adios2::helper::RerouteMessage writeMsg; + writeMsg.m_MsgType = RerouteMessage::MessageType::DO_WRITE; + writeMsg.m_SrcRank = m_RankMPI; + writeMsg.m_DestRank = nextWriter; + writeMsg.m_WildCard = static_cast(m_Aggregator->m_SubStreamIndex); + writeMsg.m_Offset = currentFilePos; + writingRank = nextWriter; + expectingWriteCompletion = true; + writeMsg.NonBlockingSendTo(m_Comm, nextWriter, sendBuffers.GetNextBuffer()); + } + else if (!sentIdle) + { + std::cout << "Rank " << m_RankMPI << " sending WRITER_IDLE to gc (" << globalCoord + << ")" << std::endl; + // Writer queue now empty, send WRITE_IDLE to the GC + adios2::helper::RerouteMessage idleMsg; + idleMsg.m_MsgType = RerouteMessage::MessageType::WRITER_IDLE; + idleMsg.m_SrcRank = m_RankMPI; + idleMsg.m_DestRank = globalCoord; + idleMsg.m_WildCard = static_cast(m_Aggregator->m_SubStreamIndex); + idleMsg.NonBlockingSendTo(m_Comm, globalCoord, sendBuffers.GetNextBuffer()); + sentIdle = true; + + // TODO: If this group is already over the threshold ratio, send + // TODO: WRITER_CAPACITY instead of WRITER_IDLE + + // adios2::helper::RerouteMessage capacityMsg; + // capacityMsg.m_MsgType = RerouteMessage::MessageType::WRITER_CAPACITY; + // // capacityMsg.m_SrcRank = m_RankMPI; + // // capacityMsg.m_DestRank = globalCoord; + // capacityMsg.m_WildCard = static_cast(m_Aggregator->m_SubStreamIndex); + // capacityMsg.NonBlockingSendTo(m_Comm, globalCoord, sendBuffers.GetNextBuffer()); + } + } + + // Global coordinator process + // Look for possible reroute-to / reroute-from pairs + if (iAmGlobalCoord && !waitingForCloseAcks) + { + std::cout << "GC (" << m_RankMPI << ") looking for reroute candidate pair" << std::endl; + std::pair nextPair; + StateTraversal::SearchResult result = pairFinder.FindNextPair(groupState, nextPair); + + if (result == StateTraversal::SearchResult::FOUND) + { + std::cout << "GC (" << m_RankMPI << ") found reroute candidate pair (" + << nextPair.first << ", " << nextPair.second + << "), sending REROUTE_REQUEST" << std::endl; + // Finding a pair means there was both an idle group and a writing + // group with at least one writer in its queue. With these, we will + // initiate a reroute sequence. + size_t idleIdx = nextPair.first; + int idleSubcoordRank = subCoordRanks[idleIdx]; + size_t writerIdx = nextPair.second; + int writerSubcoordRank = subCoordRanks[writerIdx]; + + adios2::helper::RerouteMessage rerouteReqMsg; + rerouteReqMsg.m_MsgType = RerouteMessage::MessageType::REROUTE_REQUEST; + rerouteReqMsg.m_SrcRank = writerSubcoordRank; + rerouteReqMsg.m_DestRank = idleSubcoordRank; + rerouteReqMsg.NonBlockingSendTo(m_Comm, writerSubcoordRank, + sendBuffers.GetNextBuffer()); + + groupState[idleIdx].m_currentStatus = WriterGroupState::Status::PENDING; + groupState[writerIdx].m_currentStatus = WriterGroupState::Status::PENDING; + } + else if (result == StateTraversal::SearchResult::FINISHED && groupIdlesNeeded.empty()) + { + + for (size_t i = 0; i < groupState.size(); ++i) + { + std::cout << " group " << i + << " status: " << static_cast(groupState[i].m_currentStatus) + << ", queue size: " << groupState[i].m_queueSize << std::endl; + } + // If we didn't find a pair, it could be because all the groups are + // done writing (either idle or possibly at capacity). In that case, + // we need to release the subcoordinators (and ourself) from their + // comm loop. + for (size_t scIdx = 0; scIdx < subCoordRanks.size(); ++scIdx) + { + if (subCoordRanks[scIdx] != globalCoord) + { + std::cout << "Rank " << m_RankMPI << " sending GROUP_CLOSE to rank " + << subCoordRanks[scIdx] << std::endl; + adios2::helper::RerouteMessage closeMsg; + closeMsg.m_MsgType = RerouteMessage::MessageType::GROUP_CLOSE; + closeMsg.NonBlockingSendTo(m_Comm, subCoordRanks[scIdx], + sendBuffers.GetNextBuffer()); + } + } + + std::cout << "Rank " << m_RankMPI << " marking my own close ack as received" + << std::endl; + receivedGroupClose = true; + closeAcksNeeded.erase(globalCoord); + + waitingForCloseAcks = true; + } + else + { + std::cout << "candidate pair search returned "; + + switch (result) + { + case StateTraversal::SearchResult::NOT_FOUND: + std::cout << "NOT FOUND" << std::endl; + std::cout << " states: [ "; + for (size_t i = 0; i < groupState.size(); ++i) + { + std::cout << static_cast(groupState[i].m_currentStatus) << " "; + } + std::cout << "]" << std::endl; + break; + case StateTraversal::SearchResult::FOUND: + std::cout << "FOUND" << std::endl; + break; + case StateTraversal::SearchResult::FINISHED: + std::cout << "FINISHED" << std::endl; + break; + } + } + } + + if (iAmGlobalCoord) + { + if (waitingForCloseAcks) + { + if (closeAcksNeeded.empty()) + { + std::cout << "Rank " << m_RankMPI << " got all my close acks" << std::endl; + // global coordinator received the final close ack, now it can leave + waitingForCloseAcks = false; + } + else + { + std::cout << "Rank " << m_RankMPI << " still need " << closeAcksNeeded.size() + << " close acks [ "; + for (int n : closeAcksNeeded) + { + std::cout << " " << n; + } + std::cout << " ]" << std::endl; + } + } + } + } + + // Before leaving this method, subcoordinators need to update the variable tracking + // the current file position for their particular subfile + if (iAmSubCoord) + { + m_DataPos = currentFilePos; + } + + std::cout << "Rank " << m_RankMPI << " Exit ReroutingCommunicationLoop" << std::endl; +} + +void BP5Writer::WriteData_WithRerouting(format::BufferV *Data) +{ + // - start the communcation loop running in a thread + // - wait to be signalled by the communication thread, then write: + // - variables set by comm thread indicate subfile and offset + // - attempt to join the comm thread + // - return + + // The communcation function: + // + // - Send a message to your SC, requesting to write + // - enter the message loop where you: + // - check if a message is ready, and if so: + // - receive the message + // - handle the message, based on your role and the sender + // - if no message is ready, check if the writing is finished, and if so: + // - send a message to SC telling them: + // - you're done + // - the new offset (or the amount you wrote) + + m_ReadyToWrite = false; + m_FinishedWriting = false; + + std::thread commThread(&BP5Writer::ReroutingCommunicationLoop, this); + + std::cout << "Background thread for rank " << m_RankMPI << " is now running" << std::endl; + + // wait until communication thread indicates it's our turn to write + { + std::unique_lock lck(m_WriteMutex); + m_WriteCV.wait(lck, [this] { return m_ReadyToWrite; }); + } + + // Do the writing + + std::cout << "Rank " << m_RankMPI << " signaled to write" << std::endl; + + size_t substreamIdx = static_cast(m_TargetIndex); + + // Check if we need to update which file we are writing to + if (substreamIdx != m_Aggregator->m_SubStreamIndex) + { + // We were rerouted! Our aggregator subfile index is later exchanged with other + // ranks to be written to metadata, so update it here or the metadata will be + // wrong. + m_Aggregator->m_SubStreamIndex = substreamIdx; + + // Open the subfile without doing any collective communications, since the global + // coordinator ensures only one rerouted rank opens this file at a time. Also, + // be sure to open in append mode because another rank already wrote to this file, + // and open without append mode in that case can result in a block of zeros getting + // written. + OpenSubfile(false, true); + } + + // align to PAGE_SIZE + m_DataPos += helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize); + m_StartDataPos = m_DataPos; + + std::vector DataVec = Data->DataVec(); + + AggTransportData &aggData = m_AggregatorSpecifics.at(GetCacheKey(m_Aggregator)); + aggData.m_FileDataManager.WriteFileAt(DataVec.data(), DataVec.size(), m_StartDataPos); + + m_DataPos += Data->Size(); + + // Now signal the communication thread that this rank has finished writing + { + std::lock_guard lck(m_NotifMutex); + m_FinishedWriting = true; + } + + commThread.join(); + + // std::cout << "Background thread for rank " << m_RankMPI << " is now finished" << std::endl; +} + +} // end namespace engine +} // end namespace core +} // end namespace adios2 diff --git a/source/adios2/helper/adiosComm.cpp b/source/adios2/helper/adiosComm.cpp index 9109a35212..9b0bd918d9 100644 --- a/source/adios2/helper/adiosComm.cpp +++ b/source/adios2/helper/adiosComm.cpp @@ -91,6 +91,34 @@ std::string Comm::BroadcastFile(const std::string &fileName, const std::string h return fileContents; } +Comm::Status Comm::Probe(int source, int tag, const std::string &hint) const +{ + if (source < 0 || source > m_Impl->Size() - 1) + { + if (source != static_cast(Comm::Constants::CommRecvAny)) + { + throw std::runtime_error("Invalid MPI source rank in Probe: " + std::to_string(source) + + " for a communicator of size " + + std::to_string(m_Impl->Size())); + } + } + return m_Impl->Probe(source, tag, hint); +} + +Comm::Status Comm::Iprobe(int source, int tag, int *flag, const std::string &hint) const +{ + if (source < 0 || source > m_Impl->Size() - 1) + { + if (source != static_cast(Comm::Constants::CommRecvAny)) + { + throw std::runtime_error( + "Invalid MPI source rank in Iprobe: " + std::to_string(source) + + " for a communicator of size " + std::to_string(m_Impl->Size())); + } + } + return m_Impl->Iprobe(source, tag, flag, hint); +} + std::vector Comm::GetGathervDisplacements(const size_t *counts, const size_t countsSize) { std::vector displacements(countsSize); diff --git a/source/adios2/helper/adiosComm.h b/source/adios2/helper/adiosComm.h index a7e35e20d0..86f5cdf65e 100644 --- a/source/adios2/helper/adiosComm.h +++ b/source/adios2/helper/adiosComm.h @@ -60,6 +60,14 @@ class Comm Shared }; + /** + * @brief Various constants + */ + enum class Constants + { + CommRecvAny = -10 + }; + /** * @brief Default constructor. Produces an empty communicator. * @@ -255,6 +263,10 @@ class Comm Req Irecv(T *buffer, const size_t count, int source, int tag, const std::string &hint = std::string()) const; + Status Probe(int source, int tag, const std::string &hint = std::string()) const; + + Status Iprobe(int source, int tag, int *flag, const std::string &hint = std::string()) const; + Win Win_allocate_shared(size_t size, int disp_unit, void *baseptr, const std::string &hint = std::string()); int Win_shared_query(Win &win, int rank, size_t *size, int *disp_unit, void *baseptr, @@ -485,6 +497,10 @@ class CommImpl virtual Comm::Req Irecv(void *buffer, size_t count, Datatype datatype, int source, int tag, const std::string &hint) const = 0; + virtual Comm::Status Probe(int source, int tag, const std::string &hint) const = 0; + + virtual Comm::Status Iprobe(int source, int tag, int *flag, const std::string &hint) const = 0; + virtual Comm::Win Win_allocate_shared(size_t size, int disp_unit, void *baseptr, const std::string &hint) const = 0; virtual int Win_shared_query(Comm::Win &win, int rank, size_t *size, int *disp_unit, diff --git a/source/adios2/helper/adiosComm.inl b/source/adios2/helper/adiosComm.inl index dac093af2e..035d22823e 100644 --- a/source/adios2/helper/adiosComm.inl +++ b/source/adios2/helper/adiosComm.inl @@ -259,9 +259,12 @@ Comm::Status Comm::Recv(T *buf, size_t count, int source, int tag, { if (source < 0 || source > m_Impl->Size() - 1) { - throw std::runtime_error( - "Invalid MPI source rank in Recv: " + std::to_string(source) + - " for a communicator of size " + std::to_string(m_Impl->Size())); + if (source != static_cast(Comm::Constants::CommRecvAny)) + { + throw std::runtime_error( + "Invalid MPI source rank in Recv: " + std::to_string(source) + + " for a communicator of size " + std::to_string(m_Impl->Size())); + } } return m_Impl->Recv(buf, count, CommImpl::GetDatatype(), source, tag, hint); @@ -296,9 +299,12 @@ Comm::Req Comm::Irecv(T *buffer, const size_t count, int source, int tag, { if (source < 0 || source > m_Impl->Size() - 1) { - throw std::runtime_error( - "Invalid MPI source rank in Irecv: " + std::to_string(source) + - " for a communicator of size " + std::to_string(m_Impl->Size())); + if (source != static_cast(Comm::Constants::CommRecvAny)) + { + throw std::runtime_error( + "Invalid MPI source rank in Irecv: " + std::to_string(source) + + " for a communicator of size " + std::to_string(m_Impl->Size())); + } } return m_Impl->Irecv(buffer, count, CommImpl::GetDatatype(), source, tag, hint); diff --git a/source/adios2/helper/adiosCommDummy.cpp b/source/adios2/helper/adiosCommDummy.cpp index f7453ca5a7..af03911f32 100644 --- a/source/adios2/helper/adiosCommDummy.cpp +++ b/source/adios2/helper/adiosCommDummy.cpp @@ -111,6 +111,10 @@ class CommImplDummy : public CommImpl Comm::Req Irecv(void *buffer, size_t count, Datatype datatype, int source, int tag, const std::string &hint) const override; + Comm::Status Probe(int source, int tag, const std::string &hint) const override; + + Comm::Status Iprobe(int source, int tag, int *flag, const std::string &hint) const override; + Comm::Win Win_allocate_shared(size_t size, int disp_unit, void *baseptr, const std::string &hint) const override; int Win_shared_query(Comm::Win &win, int rank, size_t *size, int *disp_unit, void *baseptr, @@ -280,6 +284,18 @@ Comm::Req CommImplDummy::Irecv(void *, size_t, Datatype, int, int, const std::st return MakeReq(std::move(req)); } +Comm::Status CommImplDummy::Probe(int source, int tag, const std::string &hint) const +{ + Comm::Status status; + return status; +} + +Comm::Status CommImplDummy::Iprobe(int source, int tag, int *flag, const std::string &hint) const +{ + Comm::Status status; + return status; +} + Comm::Win CommImplDummy::Win_allocate_shared(size_t size, int disp_unit, void *baseptr, const std::string &) const { diff --git a/source/adios2/helper/adiosCommMPI.cpp b/source/adios2/helper/adiosCommMPI.cpp index 7e6d8b1bd1..eafef64df2 100644 --- a/source/adios2/helper/adiosCommMPI.cpp +++ b/source/adios2/helper/adiosCommMPI.cpp @@ -18,6 +18,7 @@ #include "adios2/common/ADIOSTypes.h" +#include #include namespace adios2 @@ -68,6 +69,16 @@ const MPI_Datatype DatatypeToMPI[] = { MPI_SHORT_INT, }; +int GetMPISource(int source) +{ + if (source == static_cast(Comm::Constants::CommRecvAny)) + { + return MPI_ANY_SOURCE; + } + + return source; +} + MPI_Datatype ToMPI(CommImpl::Datatype dt) { return DatatypeToMPI[int(dt)]; } void CheckMPIReturn(const int value, const std::string &hint) @@ -190,6 +201,10 @@ class CommImplMPI : public CommImpl Comm::Req Irecv(void *buffer, size_t count, Datatype datatype, int source, int tag, const std::string &hint) const override; + Comm::Status Probe(int source, int tag, const std::string &hint) const override; + + Comm::Status Iprobe(int source, int tag, int *flag, const std::string &hint) const override; + Comm::Win Win_allocate_shared(size_t size, int disp_unit, void *baseptr, const std::string &hint) const override; int Win_shared_query(Comm::Win &win, int rank, size_t *size, int *disp_unit, void *baseptr, @@ -387,9 +402,12 @@ Comm::Status CommImplMPI::Recv(void *buf, size_t count, Datatype datatype, int s const std::string &hint) const { MPI_Status mpiStatus; - CheckMPIReturn( - MPI_Recv(buf, static_cast(count), ToMPI(datatype), source, tag, m_MPIComm, &mpiStatus), - hint); + // int msrc = GetMPISource(source); + // std::cout << "Recv " << count << " items of type " << static_cast(datatype) << " from " + // << msrc << std::endl; + CheckMPIReturn(MPI_Recv(buf, static_cast(count), ToMPI(datatype), GetMPISource(source), + tag, m_MPIComm, &mpiStatus), + hint); Comm::Status status; status.Source = mpiStatus.MPI_SOURCE; @@ -447,8 +465,11 @@ Comm::Req CommImplMPI::Isend(const void *buffer, size_t count, Datatype datatype { int batchSize = static_cast(count); MPI_Request mpiReq; - CheckMPIReturn(MPI_Isend(static_cast(const_cast(buffer)), batchSize, - ToMPI(datatype), dest, tag, m_MPIComm, &mpiReq), + // std::cout << "Isend " << count << " items" << std::endl; + // // CheckMPIReturn(MPI_Isend(static_cast(const_cast(buffer)), batchSize, + // // ToMPI(datatype), dest, tag, m_MPIComm, &mpiReq), + CheckMPIReturn(MPI_Isend(const_cast(buffer), batchSize, ToMPI(datatype), dest, tag, + m_MPIComm, &mpiReq), " in call to Isend with single batch " + hint + "\n"); req->m_MPIReqs.emplace_back(mpiReq); } @@ -470,7 +491,8 @@ Comm::Req CommImplMPI::Irecv(void *buffer, size_t count, Datatype datatype, int int batchSize = static_cast(DefaultMaxFileBatchSize); MPI_Request mpiReq; CheckMPIReturn(MPI_Irecv(static_cast(buffer) + position, batchSize, - ToMPI(datatype), source, tag, m_MPIComm, &mpiReq), + ToMPI(datatype), GetMPISource(source), tag, m_MPIComm, + &mpiReq), "in call to Irecv batch " + std::to_string(b) + " " + hint + "\n"); req->m_MPIReqs.emplace_back(mpiReq); @@ -483,7 +505,8 @@ Comm::Req CommImplMPI::Irecv(void *buffer, size_t count, Datatype datatype, int int batchSize = static_cast(remainder); MPI_Request mpiReq; CheckMPIReturn(MPI_Irecv(static_cast(buffer) + position, batchSize, - ToMPI(datatype), source, tag, m_MPIComm, &mpiReq), + ToMPI(datatype), GetMPISource(source), tag, m_MPIComm, + &mpiReq), "in call to Irecv remainder batch " + hint + "\n"); req->m_MPIReqs.emplace_back(mpiReq); } @@ -492,15 +515,35 @@ Comm::Req CommImplMPI::Irecv(void *buffer, size_t count, Datatype datatype, int { int batchSize = static_cast(count); MPI_Request mpiReq; - CheckMPIReturn( - MPI_Irecv(buffer, batchSize, ToMPI(datatype), source, tag, m_MPIComm, &mpiReq), - " in call to Isend with single batch " + hint + "\n"); + CheckMPIReturn(MPI_Irecv(buffer, batchSize, ToMPI(datatype), GetMPISource(source), tag, + m_MPIComm, &mpiReq), + " in call to Isend with single batch " + hint + "\n"); req->m_MPIReqs.emplace_back(mpiReq); } return MakeReq(std::move(req)); } +Comm::Status CommImplMPI::Probe(int source, int tag, const std::string &hint) const +{ + MPI_Status mpiStatus; + CheckMPIReturn(MPI_Probe(GetMPISource(source), tag, m_MPIComm, &mpiStatus), hint); + Comm::Status status; + status.Source = mpiStatus.MPI_SOURCE; + status.Tag = mpiStatus.MPI_TAG; + return status; +} + +Comm::Status CommImplMPI::Iprobe(int source, int tag, int *flag, const std::string &hint) const +{ + MPI_Status mpiStatus; + CheckMPIReturn(MPI_Iprobe(GetMPISource(source), tag, m_MPIComm, flag, &mpiStatus), hint); + Comm::Status status; + status.Source = mpiStatus.MPI_SOURCE; + status.Tag = mpiStatus.MPI_TAG; + return status; +} + Comm::Win CommImplMPI::Win_allocate_shared(size_t size, int disp_unit, void *baseptr, const std::string &hint) const { diff --git a/source/adios2/helper/adiosRerouting.cpp b/source/adios2/helper/adiosRerouting.cpp new file mode 100644 index 0000000000..1f6ab322ed --- /dev/null +++ b/source/adios2/helper/adiosRerouting.cpp @@ -0,0 +1,106 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * adiosRerouting.cpp + * + * Created on: Sept 26, 2025 + * Author: Scott Wittenburg scott.wittenburg@kitware.com + */ + +#include "adiosRerouting.h" +#include "adios2/helper/adiosComm.h" +#include "adios2/helper/adiosMemory.h" + +/// \cond EXCLUDE_FROM_DOXYGEN +#include +#include +#include +#include +#include +#include +#include +/// \endcond + +namespace adios2 +{ +namespace helper +{ + +void RerouteMessage::NonBlockingSendTo(helper::Comm &comm, int destRank, std::vector &buffer) +{ + size_t pos = 0; + buffer.resize(REROUTE_MESSAGE_SIZE); + helper::CopyToBuffer(buffer, pos, &this->m_MsgType); + helper::CopyToBuffer(buffer, pos, &this->m_SrcRank); + helper::CopyToBuffer(buffer, pos, &this->m_DestRank); + helper::CopyToBuffer(buffer, pos, &this->m_WildCard); + helper::CopyToBuffer(buffer, pos, &this->m_Offset); + helper::CopyToBuffer(buffer, pos, &this->m_Size); + + // std::stringstream ss; + + // ss << "Rank " << comm.Rank() << " SEND buffer of length " << buffer.size() << ": ["; + // for (size_t i = 0; i < buffer.size(); ++i) + // { + // ss << " " << static_cast(buffer[i]); + // } + // ss << " ]\n"; + + // std::cout << ss.str(); + + comm.Isend(buffer.data(), buffer.size(), destRank, 0); +} + +void RerouteMessage::BlockingSendTo(helper::Comm &comm, int destRank, std::vector &buffer) +{ + size_t pos = 0; + buffer.resize(REROUTE_MESSAGE_SIZE); + helper::CopyToBuffer(buffer, pos, &this->m_MsgType); + helper::CopyToBuffer(buffer, pos, &this->m_SrcRank); + helper::CopyToBuffer(buffer, pos, &this->m_DestRank); + helper::CopyToBuffer(buffer, pos, &this->m_WildCard); + helper::CopyToBuffer(buffer, pos, &this->m_Offset); + helper::CopyToBuffer(buffer, pos, &this->m_Size); + + // std::stringstream ss; + + // ss << "Rank " << comm.Rank() << " SEND buffer of length " << buffer.size() << ": ["; + // for (size_t i = 0; i < buffer.size(); ++i) + // { + // ss << " " << static_cast(buffer[i]); + // } + // ss << " ]\n"; + + // std::cout << ss.str(); + + comm.Send(buffer.data(), buffer.size(), destRank, 0); +} + +void RerouteMessage::BlockingRecvFrom(helper::Comm &comm, int srcRank, std::vector &buffer) +{ + buffer.resize(REROUTE_MESSAGE_SIZE); + comm.Recv(buffer.data(), REROUTE_MESSAGE_SIZE, srcRank, 0); + + // std::stringstream ss; + + // ss << "Rank " << comm.Rank() << " RECV buffer of length " << buffer.size() << ": ["; + // for (size_t i = 0; i < buffer.size(); ++i) + // { + // ss << " " << static_cast(buffer[i]); + // } + // ss << " ]\n"; + + // std::cout << ss.str(); + + size_t pos = 0; + helper::CopyFromBuffer(buffer.data(), pos, &this->m_MsgType); + helper::CopyFromBuffer(buffer.data(), pos, &this->m_SrcRank); + helper::CopyFromBuffer(buffer.data(), pos, &this->m_DestRank); + helper::CopyFromBuffer(buffer.data(), pos, &this->m_WildCard); + helper::CopyFromBuffer(buffer.data(), pos, &this->m_Offset); + helper::CopyFromBuffer(buffer.data(), pos, &this->m_Size); +} + +} // end namespace helper +} // end namespace adios2 diff --git a/source/adios2/helper/adiosRerouting.h b/source/adios2/helper/adiosRerouting.h new file mode 100644 index 0000000000..fdf59e4c67 --- /dev/null +++ b/source/adios2/helper/adiosRerouting.h @@ -0,0 +1,90 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * adiosRerouting.h helpers for BP5 rerouting aggregation + * + * Created on: Sept 26, 2025 + * Author: Scott Wittenburg scott.wittenburg@kitware.com + */ + +#ifndef ADIOS2_HELPER_ADIOSREROUTING_H_ +#define ADIOS2_HELPER_ADIOSREROUTING_H_ + +#include "adios2/helper/adiosComm.h" + +/// \cond EXCLUDE_FROM_DOXYGEN +#include +#include +#include +/// \endcond + +namespace adios2 +{ + +namespace helper +{ + +class RerouteMessage +{ +public: + enum class MessageType + { + DO_WRITE, + WRITER_CAPACITY, + WRITER_IDLE, + WRITE_MORE, + WRITE_COMPLETION, + REROUTE_REQUEST, + REROUTE_ACK, + REROUTE_REJECT, + GROUP_CLOSE, + GROUP_CLOSE_ACK, + STATUS_INQUIRY, + STATUS_REPLY, + }; + + std::string GetTypeString(MessageType mtype) + { + switch (mtype) + { + case MessageType::DO_WRITE: + return std::string("DO_WRITE"); + break; + case MessageType::WRITER_IDLE: + return std::string("WRITER_IDLE"); + break; + case MessageType::WRITE_MORE: + return std::string("WRITE_MORE"); + break; + case MessageType::WRITE_COMPLETION: + return std::string("WRITE_COMPLETION"); + break; + default: + return std::string("UNKNOWN"); + break; + } + } + + // Send the contents of this message to another rank + void NonBlockingSendTo(helper::Comm &comm, int destRank, std::vector &buffer); + void BlockingSendTo(helper::Comm &comm, int destRank, std::vector &buffer); + + // Receive a message from another rank to populate this message + void BlockingRecvFrom(helper::Comm &comm, int srcRank, std::vector &buffer); + + MessageType m_MsgType; + int m_SrcRank; + int m_DestRank; + int m_WildCard; + uint64_t m_Offset; + uint64_t m_Size; + + static const size_t REROUTE_MESSAGE_SIZE = sizeof(MessageType) + sizeof(int) + sizeof(int) + + sizeof(int) + sizeof(uint64_t) + sizeof(uint64_t); +}; + +} // end namespace helper +} // end namespace adios2 + +#endif /* ADIOS2_HELPER_ADIOSREROUTING_H_ */ diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt index 840817518d..a3e877f839 100644 --- a/testing/adios2/engine/bp/CMakeLists.txt +++ b/testing/adios2/engine/bp/CMakeLists.txt @@ -126,10 +126,13 @@ bp_gtest_add_tests_helper(LargeMetadata MPI_ALLOW) bp5_gtest_add_tests_helper(WriteStatsOnly MPI_ALLOW) if (ADIOS2_HAVE_MPI) - # Extra arguments: aggregation type, num subfiles, num timesteps, verbose level + # Extra arguments: aggregation type, num subfiles, num timesteps, verbose level, rerouting? gtest_add_tests_helper(DataSizeAggregate MPI_ONLY BP Engine.BP. .BP5.DSB WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "DataSizeBased" "3" "5" "2" ) + gtest_add_tests_helper(DataSizeAggregate MPI_ONLY BP Engine.BP. .BP5.DSB.Rerouting + WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "DataSizeBased" "3" "5" "5" "WithRerouting" + ) endif() set(BP5LargeMeta "Engine.BP.BPLargeMetadata.BPWrite1D_LargeMetadata.BP5.Serial") @@ -245,8 +248,15 @@ macro(bp5agg_gtest_add_tests testname mpi) WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "BP5" "TwoLevelShm") gtest_add_tests_helper(${testname} ${mpi} BP Engine.BPAGG. .BP5.DSB WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "BP5" "DataSizeBased") + gtest_add_tests_helper(${testname} ${mpi} BP Engine.BPAGG. .BP5.DSB.RR + WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "BP5" "DataSizeBased" "WithRerouting") endmacro() +if (ADIOS2_HAVE_MPI) + gtest_add_tests_helper(RerouteMessage MPI_ONLY BP Engine.BPReroute. .BP5.RR + WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "BP5") +endif() + # BP5 only for now, so test all the aggregators while we're at it bp5agg_gtest_add_tests(ParameterSelectSteps MPI_ALLOW) bp5agg_gtest_add_tests(AppendAfterSteps MPI_ALLOW) diff --git a/testing/adios2/engine/bp/TestBPAppendAfterSteps.cpp b/testing/adios2/engine/bp/TestBPAppendAfterSteps.cpp index 58e91332dd..046d7c6940 100644 --- a/testing/adios2/engine/bp/TestBPAppendAfterSteps.cpp +++ b/testing/adios2/engine/bp/TestBPAppendAfterSteps.cpp @@ -22,6 +22,7 @@ std::string engineName; // comes from command line std::string aggType = "TwoLevelShm"; // overridden on command line +bool rerouting = false; // overridden on command line const std::size_t Nx = 10; using DataArray = std::array; @@ -88,9 +89,9 @@ TEST_P(BPAppendAfterStepsP, Test) << " steps, then appending " << nSteps << " steps again with parameter " << nAppendAfterSteps << std::endl; - std::string filename = "AppendAfterSteps_agg_" + aggType + "_N" + std::to_string(mpiSize) + - "_Steps" + std::to_string(nSteps) + "_Append_" + - std::to_string(nAppendAfterSteps) + ".bp"; + std::string filename = "AppendAfterSteps_agg_" + aggType + "_RR" + (rerouting ? "Y" : "N") + + "_N" + std::to_string(mpiSize) + "_Steps" + std::to_string(nSteps) + + "_Append_" + std::to_string(nAppendAfterSteps) + ".bp"; size_t totalNSteps = 0; { @@ -98,6 +99,10 @@ TEST_P(BPAppendAfterStepsP, Test) adios2::IO ioWrite = adios.DeclareIO("TestIOWrite"); ioWrite.SetEngine(engineName); ioWrite.SetParameter("AggregationType", aggType); + + const char *rr = (rerouting ? "true" : "false"); + ioWrite.SetParameter("EnableWriterRerouting", rr); + adios2::Engine engine = ioWrite.Open(filename, adios2::Mode::Write); adios2::Dims shape{static_cast(mpiSize * Nx)}; adios2::Dims start{static_cast(mpiRank * Nx)}; @@ -222,6 +227,15 @@ int main(int argc, char **argv) aggType = std::string(argv[2]); } + if (argc > 3) + { + std::string lastArg = std::string(argv[3]); + if (lastArg.compare("WithRerouting") == 0) + { + rerouting = true; + } + } + result = RUN_ALL_TESTS(); #if ADIOS2_USE_MPI diff --git a/testing/adios2/engine/bp/TestBPDataSizeAggregate.cpp b/testing/adios2/engine/bp/TestBPDataSizeAggregate.cpp index f47ea254ff..12877d6800 100644 --- a/testing/adios2/engine/bp/TestBPDataSizeAggregate.cpp +++ b/testing/adios2/engine/bp/TestBPDataSizeAggregate.cpp @@ -18,7 +18,8 @@ uint64_t nSteps = 1; std::string aggregationType = "DataSizeBased"; // comes from command line std::string numberOfSubFiles = "2"; // comes from command line std::string numberOfSteps = "1"; // comes from command line -std::string verbose = "0"; +std::string verbose = "0"; // comes from command line +bool rerouting = false; // comes from command line uint64_t sumFirstN(const std::vector &vec, uint64_t n) { @@ -93,6 +94,8 @@ TEST_F(DSATest, TestWriteUnbalancedData) uint64_t globalNy = sumFirstN(columnsPerRank, columnsPerRank.size()); uint64_t largestValue = (globalNx * globalNy) - 1; + std::string filename = std::string("unbalanced_output") + "_RR" + (rerouting ? "Y" : "N"); + { adios2::IO bpIO = adios.DeclareIO("WriteIO"); bpIO.SetEngine("BPFile"); @@ -100,11 +103,14 @@ TEST_F(DSATest, TestWriteUnbalancedData) bpIO.SetParameter("NumSubFiles", numberOfSubFiles); bpIO.SetParameter("verbose", verbose); + const char *rr = (rerouting ? "true" : "false"); + bpIO.SetParameter("EnableWriterRerouting", rr); + adios2::Variable varGlobalArray = bpIO.DefineVariable("GlobalArray", {globalNx, globalNy}); EXPECT_TRUE(varGlobalArray); - adios2::Engine bpWriter = bpIO.Open("unbalanced_output.bp", adios2::Mode::Write); + adios2::Engine bpWriter = bpIO.Open(filename, adios2::Mode::Write); for (size_t step = 0; step < nSteps; ++step) { @@ -153,7 +159,7 @@ TEST_F(DSATest, TestWriteUnbalancedData) adios2::IO io = adios.DeclareIO("ReadIO"); io.SetEngine("BPFile"); - adios2::Engine bpReader = io.Open("unbalanced_output.bp", adios2::Mode::ReadRandomAccess); + adios2::Engine bpReader = io.Open(filename, adios2::Mode::ReadRandomAccess); auto var_array = io.InquireVariable("GlobalArray"); EXPECT_TRUE(var_array); @@ -214,6 +220,14 @@ int main(int argc, char **argv) { verbose = std::string(argv[4]); } + if (argc > 5) + { + std::string lastArg = std::string(argv[5]); + if (lastArg.compare("WithRerouting") == 0) + { + rerouting = true; + } + } try { diff --git a/testing/adios2/engine/bp/TestBPDirectIO.cpp b/testing/adios2/engine/bp/TestBPDirectIO.cpp index a9390eb6f9..0822af53ba 100644 --- a/testing/adios2/engine/bp/TestBPDirectIO.cpp +++ b/testing/adios2/engine/bp/TestBPDirectIO.cpp @@ -15,6 +15,7 @@ std::string engineName; // comes from command line std::string aggType = "TwoLevelShm"; // comes from command line +bool rerouting = false; // overridden on command line class ADIOSReadDirectIOTest : public ::testing::Test { @@ -31,7 +32,7 @@ TEST_F(ADIOSReadDirectIOTest, BufferResize) int mpiRank = 0, mpiSize = 1; - std::string filename = "ADIOSDirectIO.agg-" + aggType; + std::string filename = "ADIOSDirectIO.agg-" + aggType + "_RR" + (rerouting ? "Y" : "N"); #if ADIOS2_USE_MPI MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); @@ -59,6 +60,9 @@ TEST_F(ADIOSReadDirectIOTest, BufferResize) // BufferChunkSize should be adjusted to 2*4096 by engine // StripeSize should be adjusted to 3*4096 by engine + const char *rr = (rerouting ? "true" : "false"); + ioWrite.SetParameter("EnableWriterRerouting", rr); + adios2::Engine engine = ioWrite.Open(filename, adios2::Mode::Write); // Number of elements per process const std::size_t Nx = 2000; @@ -145,6 +149,15 @@ int main(int argc, char **argv) aggType = std::string(argv[2]); } + if (argc > 3) + { + std::string lastArg = std::string(argv[3]); + if (lastArg.compare("WithRerouting") == 0) + { + rerouting = true; + } + } + int result = RUN_ALL_TESTS(); #if ADIOS2_USE_MPI MPI_Finalize(); diff --git a/testing/adios2/engine/bp/TestBPNewFileAppendMode.cpp b/testing/adios2/engine/bp/TestBPNewFileAppendMode.cpp index 4ba9b07b91..7390a1af7d 100644 --- a/testing/adios2/engine/bp/TestBPNewFileAppendMode.cpp +++ b/testing/adios2/engine/bp/TestBPNewFileAppendMode.cpp @@ -14,6 +14,7 @@ std::string engineName; // comes from command line std::string aggregationType = "EveryoneWritesSerial"; // comes from command line +bool rerouting = false; // overridden on command line class BPNewFileAppendMode : public ::testing::Test { @@ -43,6 +44,8 @@ TEST_F(BPNewFileAppendMode, ADIOS2BPNewFileAppendMode) fname += "-EW.bp"; } + fname += std::string("_RR") + (rerouting ? "Y" : "N"); + const size_t Nx = 6; adios2::ADIOS adios; @@ -64,6 +67,9 @@ TEST_F(BPNewFileAppendMode, ADIOS2BPNewFileAppendMode) io.SetParameter("AggregationType", aggregationType); io.SetParameter("NumAggregators", "0"); + const char *rr = (rerouting ? "true" : "false"); + io.SetParameter("EnableWriterRerouting", rr); + adios2::Engine engine = io.Open(fname, adios2::Mode::Append); engine.Close(); @@ -84,6 +90,15 @@ int main(int argc, char **argv) aggregationType = std::string(argv[2]); } + if (argc > 3) + { + std::string lastArg = std::string(argv[3]); + if (lastArg.compare("WithRerouting") == 0) + { + rerouting = true; + } + } + result = RUN_ALL_TESTS(); return result; } \ No newline at end of file diff --git a/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp b/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp index b467eb7ca4..1c9e0ac677 100644 --- a/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp +++ b/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp @@ -22,6 +22,7 @@ std::string engineName; // comes from command line std::string aggType = "TwoLevelShm"; // overridden on command line +bool rerouting = false; // overridden on command line int streamingFileId = 0; constexpr std::size_t NSteps = 10; const std::size_t Nx = 10; @@ -80,12 +81,16 @@ class BPParameterSelectSteps : public ::testing::Test #else adios2::ADIOS adios; #endif - OutputFileName = "ParameterSelectSteps_agg_" + aggType + "_id_" + - std::to_string(streamingFileId++) + "_size_" + std::to_string(mpiSize) + - ".bp"; + OutputFileName = "ParameterSelectSteps_agg_" + aggType + "_RR" + (rerouting ? "Y" : "N") + + "_id_" + std::to_string(streamingFileId++) + "_size_" + + std::to_string(mpiSize) + ".bp"; adios2::IO ioWrite = adios.DeclareIO("TestIOWrite"); ioWrite.SetEngine(engineName); ioWrite.SetParameter("AggregationType", aggType); + + const char *rr = (rerouting ? "true" : "false"); + ioWrite.SetParameter("EnableWriterRerouting", rr); + adios2::Engine engine = ioWrite.Open(OutputFileName, adios2::Mode::Write); // Number of elements per process const std::size_t Nx = 10; @@ -200,12 +205,16 @@ TEST_P(BPParameterSelectStepsP, Stream) adios2::ADIOS adios; #endif - std::string filename = "ParameterSelectStepsStream_agg_" + aggType + "_id_" + - std::to_string(streamingFileId++) + "_size_" + std::to_string(mpiSize) + - ".bp"; + std::string filename = "ParameterSelectStepsStream_agg_" + aggType + "_RR" + + (rerouting ? "Y" : "N") + "_id_" + std::to_string(streamingFileId++) + + "_size_" + std::to_string(mpiSize) + ".bp"; adios2::IO ioWrite = adios.DeclareIO("TestIOWrite"); ioWrite.SetEngine(engineName); ioWrite.SetParameter("AggregationType", aggType); + + const char *rr = (rerouting ? "true" : "false"); + ioWrite.SetParameter("EnableWriterRerouting", rr); + adios2::Engine writer = ioWrite.Open(filename, adios2::Mode::Write); adios2::IO ioRead = adios.DeclareIO("TestIORead"); @@ -320,6 +329,15 @@ int main(int argc, char **argv) aggType = std::string(argv[2]); } + if (argc > 3) + { + std::string lastArg = std::string(argv[3]); + if (lastArg.compare("WithRerouting") == 0) + { + rerouting = true; + } + } + result = RUN_ALL_TESTS(); #if ADIOS2_USE_MPI diff --git a/testing/adios2/engine/bp/TestBPReadMultithreaded.cpp b/testing/adios2/engine/bp/TestBPReadMultithreaded.cpp index 6c4140e249..056407c33f 100644 --- a/testing/adios2/engine/bp/TestBPReadMultithreaded.cpp +++ b/testing/adios2/engine/bp/TestBPReadMultithreaded.cpp @@ -22,6 +22,7 @@ std::string engineName; // comes from command line std::string aggType = "TwoLevelShm"; // overridden on command line +bool rerouting = false; // overridden on command line constexpr std::size_t NSteps = 4; const std::size_t Nx = 10; using DataArray = std::array; @@ -85,22 +86,27 @@ class BPReadMultithreadedTest : public ::testing::Test #else adios2::ADIOS adios; #endif + std::string rr(std::string("_RR") + (rerouting ? "Y" : "N")); std::string filename; if (stream) { - StreamOutputFileName = "BPReadMultithreaded_agg_" + aggType + "_size_" + + StreamOutputFileName = "BPReadMultithreaded_agg_" + aggType + rr + "_size_" + std::to_string(mpiSize) + "_Stream.bp"; filename = StreamOutputFileName; } else { - FileOutputFileName = "BPReadMultithreaded_agg_" + aggType + "_size_" + + FileOutputFileName = "BPReadMultithreaded_agg_" + aggType + rr + "_size_" + std::to_string(mpiSize) + "_File.bp"; filename = FileOutputFileName; } adios2::IO ioWrite = adios.DeclareIO("TestIOWrite"); ioWrite.SetEngine(engineName); ioWrite.SetParameter("AggregationType", aggType); + + const char *rrParam = (rerouting ? "true" : "false"); + ioWrite.SetParameter("EnableWriterRerouting", rrParam); + adios2::Engine engine = ioWrite.Open(filename, adios2::Mode::Write); // Number of elements per process const std::size_t Nx = 10; @@ -302,6 +308,15 @@ int main(int argc, char **argv) aggType = std::string(argv[2]); } + if (argc > 3) + { + std::string lastArg = std::string(argv[3]); + if (lastArg.compare("WithRerouting") == 0) + { + rerouting = true; + } + } + result = RUN_ALL_TESTS(); #if ADIOS2_USE_MPI diff --git a/testing/adios2/engine/bp/TestBPRerouteMessage.cpp b/testing/adios2/engine/bp/TestBPRerouteMessage.cpp new file mode 100644 index 0000000000..2eecd418ac --- /dev/null +++ b/testing/adios2/engine/bp/TestBPRerouteMessage.cpp @@ -0,0 +1,155 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ + +#include "adios2/helper/adiosMemory.h" +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace adios2; + +namespace +{ +int worldRank, worldSize; + +void SendAndReceiveMessage(helper::Comm &comm, int destRank, int srcRank) +{ + std::cout << "Sending to " << destRank << " and expecting to receive from: " << srcRank + << std::endl; + + std::vector sendBuffer; + std::vector recvBuffer; + + // Send a message to another rank + adios2::helper::RerouteMessage origMsg; + origMsg.m_MsgType = adios2::helper::RerouteMessage::MessageType::WRITER_IDLE; + origMsg.m_SrcRank = worldRank; + origMsg.m_DestRank = destRank; + origMsg.m_Offset = 2138; + origMsg.m_Size = 1213; + origMsg.NonBlockingSendTo(comm, destRank, sendBuffer); + + int ready = 0; + helper::Comm::Status status; + + while (!ready) + { + status = comm.Iprobe(static_cast(helper::Comm::Constants::CommRecvAny), 0, &ready); + } + + // Receive a message from another (any) rank + adios2::helper::RerouteMessage receivedMsg; + receivedMsg.BlockingRecvFrom(comm, status.Source, recvBuffer); + + std::stringstream ss; + + ss << "m_MsgType, orig = " << static_cast(origMsg.m_MsgType) + << ", rcvd = " << static_cast(receivedMsg.m_MsgType) << "\n"; + ss << "m_SrcRank, orig = " << srcRank << ", rcvd = " << receivedMsg.m_SrcRank << "\n"; + ss << "m_DestRank, orig = " << worldRank << ", rcvd = " << receivedMsg.m_DestRank << "\n"; + ss << "m_Offset, orig = " << origMsg.m_Offset << ", rcvd = " << receivedMsg.m_Offset << "\n"; + ss << "m_Size, orig = " << origMsg.m_Size << ", rcvd = " << receivedMsg.m_Size << "\n"; + + std::cout << ss.str(); + + ASSERT_EQ(receivedMsg.m_MsgType, origMsg.m_MsgType); + ASSERT_EQ(receivedMsg.m_SrcRank, srcRank); + ASSERT_EQ(receivedMsg.m_DestRank, worldRank); + ASSERT_EQ(receivedMsg.m_Offset, origMsg.m_Offset); + ASSERT_EQ(receivedMsg.m_Size, origMsg.m_Size); +} +} + +class RerouteTest : public ::testing::Test +{ +public: + RerouteTest() = default; +}; + +TEST_F(RerouteTest, TestSendReceiveRoundRobin) +{ + helper::Comm comm = adios2::helper::CommDupMPI(MPI_COMM_WORLD); + + int destRank = worldRank >= worldSize - 1 ? 0 : worldRank + 1; + int expectedSender = worldRank <= 0 ? worldSize - 1 : worldRank - 1; + + SendAndReceiveMessage(comm, destRank, expectedSender); +} + +TEST_F(RerouteTest, TestSendReceiveSelf) +{ + helper::Comm comm = adios2::helper::CommDupMPI(MPI_COMM_WORLD); + + int destRank = worldRank; + int expectedSender = worldRank; + + SendAndReceiveMessage(comm, destRank, expectedSender); +} + +TEST_F(RerouteTest, TestSendReceiveBare) +{ + helper::Comm comm = adios2::helper::CommWithMPI(MPI_COMM_WORLD); + + int sendToRank = worldRank >= worldSize - 1 ? 0 : worldRank + 1; + int recvFromRank = worldRank <= 0 ? worldSize - 1 : worldRank - 1; + + std::vector sendBuffer = {1, 2, 3}; + + comm.Isend(sendBuffer.data(), sendBuffer.size(), sendToRank, 0); + + std::vector recvBuffer; + recvBuffer.resize(sendBuffer.size()); + + helper::Comm::Status status = comm.Recv(recvBuffer.data(), recvBuffer.size(), recvFromRank, 0); + + std::cout << "Rank " << comm.Rank() << " received " << status.Count << " elts" << std::endl; + + ASSERT_EQ(recvBuffer.size(), sendBuffer.size()); + + for (size_t i = 0; i < recvBuffer.size(); ++i) + { + ASSERT_EQ(static_cast(recvBuffer[i]), static_cast(sendBuffer[i])); + } +} + +TEST_F(RerouteTest, TestSendReceiveMoreBare) +{ + int sendToRank = worldRank >= worldSize - 1 ? 0 : worldRank + 1; + int recvFromRank = worldRank <= 0 ? worldSize - 1 : worldRank - 1; + const int count = 3; + + // Send the buffer of chars (non-blocking) + MPI_Request request = MPI_REQUEST_NULL; + char sendBuffer[count] = {1, 2, 3}; + MPI_Isend(&sendBuffer, count, MPI_CHAR, sendToRank, 0, MPI_COMM_WORLD, &request); + + // Receive the buffer of chars (blocking) + MPI_Status status; + char recvBuffer[count]; + MPI_Recv(&recvBuffer, count, MPI_CHAR, recvFromRank, 0, MPI_COMM_WORLD, &status); + + for (int i = 0; i < count; ++i) + { + ASSERT_EQ(static_cast(recvBuffer[i]), static_cast(sendBuffer[i])); + } +} + +int main(int argc, char **argv) +{ + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &worldRank); + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + ::testing::InitGoogleTest(&argc, argv); + + int result = RUN_ALL_TESTS(); + + MPI_Finalize(); + return result; +} diff --git a/testing/adios2/engine/bp/TestBPStepsFileGlobalArray.cpp b/testing/adios2/engine/bp/TestBPStepsFileGlobalArray.cpp index 8f53e734ff..97261c4e57 100644 --- a/testing/adios2/engine/bp/TestBPStepsFileGlobalArray.cpp +++ b/testing/adios2/engine/bp/TestBPStepsFileGlobalArray.cpp @@ -18,6 +18,7 @@ std::string engineName; // comes from command line std::string aggType = ""; // overridden on command line +bool rerouting = false; // overridden on command line // Number of elements per process const std::size_t Nx = 10; @@ -109,6 +110,18 @@ std::string AggregationTypeAlias(const std::string &aggregationType) } } +std::string ReroutingPart(const std::string &aggregationType, bool rr) +{ + if (aggregationType == "") + { + return ""; + } + + std::string ret("_RR"); + ret += (rerouting ? "Y" : "N"); + return ret; +} + class BPStepsFileGlobalArrayReaders : public BPStepsFileGlobalArray, public ::testing::WithParamInterface { @@ -121,7 +134,7 @@ TEST_P(BPStepsFileGlobalArrayReaders, EveryStep) { const ReadMode readMode = GetReadMode(); std::string fname_prefix = "BPStepsFileGlobalArray.EveryStep." + ReadModeToString(readMode) + - AggregationTypeAlias(aggType); + AggregationTypeAlias(aggType) + ReroutingPart(aggType, rerouting); int mpiRank = 0, mpiSize = 1; const std::size_t NSteps = 4; @@ -159,7 +172,10 @@ TEST_P(BPStepsFileGlobalArrayReaders, EveryStep) if (aggType != "") { io.SetParameter("AggregationType", aggType); + const char *rr = (rerouting ? "true" : "false"); + io.SetParameter("EnableWriterRerouting", rr); } + adios2::Engine engine = io.Open(fname, adios2::Mode::Write); auto var_i32 = io.DefineVariable("i32", shape, start, count); @@ -380,7 +396,8 @@ TEST_P(BPStepsFileGlobalArrayReaders, NewVarPerStep) { const ReadMode readMode = GetReadMode(); std::string fname_prefix = "BPStepsFileGlobalArray.NewVarPerStep." + - ReadModeToString(readMode) + AggregationTypeAlias(aggType); + ReadModeToString(readMode) + AggregationTypeAlias(aggType) + + ReroutingPart(aggType, rerouting); int mpiRank = 0, mpiSize = 1; const std::size_t NSteps = 4; @@ -420,7 +437,10 @@ TEST_P(BPStepsFileGlobalArrayReaders, NewVarPerStep) if (aggType != "") { io.SetParameter("AggregationType", aggType); + const char *rr = (rerouting ? "true" : "false"); + io.SetParameter("EnableWriterRerouting", rr); } + adios2::Engine engine = io.Open(fname, adios2::Mode::Write); for (int step = 0; step < static_cast(NSteps); ++step) @@ -667,7 +687,8 @@ TEST_P(BPStepsFileGlobalArrayParameters, EveryOtherStep) const ReadMode readMode = GetReadMode(); std::string fname_prefix = "BPStepsFileGlobalArray.EveryOtherStep.Steps" + std::to_string(NSteps) + ".Oddity" + std::to_string(Oddity) + "." + - ReadModeToString(readMode) + AggregationTypeAlias(aggType); + ReadModeToString(readMode) + AggregationTypeAlias(aggType) + + ReroutingPart(aggType, rerouting); int mpiRank = 0, mpiSize = 1; #if ADIOS2_USE_MPI @@ -708,7 +729,10 @@ TEST_P(BPStepsFileGlobalArrayParameters, EveryOtherStep) if (aggType != "") { io.SetParameter("AggregationType", aggType); + const char *rr = (rerouting ? "true" : "false"); + io.SetParameter("EnableWriterRerouting", rr); } + adios2::Engine engine = io.Open(fname, adios2::Mode::Write); auto var_i32 = io.DefineVariable("i32", shape, start, count); @@ -982,6 +1006,15 @@ int main(int argc, char **argv) aggType = std::string(argv[2]); } + if (argc > 3) + { + std::string lastArg = std::string(argv[3]); + if (lastArg.compare("WithRerouting") == 0) + { + rerouting = true; + } + } + result = RUN_ALL_TESTS(); #if ADIOS2_USE_MPI