Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c50d043
bp5writer: defer writer subfile map generation
scottwittenburg Nov 6, 2025
4f47ee3
bp5writer: InitTransports() allow opening a file later
scottwittenburg Nov 6, 2025
db61893
Add a RerouteMessage class with send/recv capability
scottwittenburg Sep 26, 2025
624cb90
Start working on the rerouting code path
scottwittenburg Sep 26, 2025
afdc1c1
Work on rerouting aggregator
scottwittenburg Oct 2, 2025
ac74c54
Test passes except when it hangs, no rerouting yet
scottwittenburg Oct 2, 2025
2bc1d77
Fix race condition
scottwittenburg Oct 3, 2025
bb4f632
Fix failing tests when using the new rerouting feature
scottwittenburg Oct 3, 2025
bc221bb
Update bp5 only tests for rerouting, and try rerouting by default
scottwittenburg Oct 3, 2025
cd35217
run clang format
scottwittenburg Oct 3, 2025
69cf99b
fix a couple warnings
scottwittenburg Oct 3, 2025
ba5ea7f
fix some type mismatches, comment debug stmts
scottwittenburg Oct 6, 2025
a915a70
run clang format
scottwittenburg Oct 6, 2025
bf8db40
fix unused variable warning
scottwittenburg Oct 6, 2025
9aaa3ac
style fix
scottwittenburg Oct 7, 2025
908be4a
Try to fix issues sending to and receiving from self
scottwittenburg Oct 7, 2025
cda5fe4
run clang format
scottwittenburg Oct 7, 2025
8bd0feb
Try to fix round-robin send/recv on OpenMPI
scottwittenburg Oct 7, 2025
c7ff858
avoid changing filename for BP3 invocation
scottwittenburg Oct 7, 2025
130fd9e
WIP: see if it is just the message type
scottwittenburg Oct 8, 2025
5234602
WIP: add debug stmts
scottwittenburg Oct 8, 2025
74f1e69
remove one layer of casting in Isend
scottwittenburg Oct 9, 2025
c05d036
simplify the send/recv bare test
scottwittenburg Oct 9, 2025
8458353
clang format
scottwittenburg Oct 29, 2025
2f58b31
fix compile warnings and errors
scottwittenburg Oct 29, 2025
d5fff62
Introduce STATUS_INQUIRY and STATUS_REPLY messages
scottwittenburg Nov 4, 2025
e317ca5
Abstractions to help global coord manage subcoord state
scottwittenburg Nov 5, 2025
0d5a885
Add messages and state management needed for actual re-routing
scottwittenburg Nov 6, 2025
c6c1664
run clang format
scottwittenburg Nov 6, 2025
d66bfe0
Debug/fix cycles
scottwittenburg Nov 7, 2025
ad0c1ad
More debug/fix cycles
scottwittenburg Nov 8, 2025
4b039ec
run clang-format
scottwittenburg Nov 8, 2025
67f86a6
Fix a few bugs and adjust debug logs
scottwittenburg Dec 2, 2025
87aaec6
Fix subtle bug that could cause blocks to disappear
scottwittenburg Dec 2, 2025
4c7a645
run clang format
scottwittenburg Dec 2, 2025
37bd224
Fix some compiler warnings
scottwittenburg Dec 3, 2025
8586d6d
Debug stmts break tests, disable them
scottwittenburg Dec 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ add_library(adios2_core
helper/adiosNetwork.cpp
helper/adiosPartitioner.cpp
helper/adiosPluginManager.cpp
helper/adiosRerouting.cpp
helper/adiosString.cpp helper/adiosString.tcc
helper/adiosSystem.cpp
helper/adiosType.cpp
Expand All @@ -57,6 +58,7 @@ add_library(adios2_core
engine/bp5/BP5Writer.tcc
engine/bp5/BP5Writer_TwoLevelShm_Async.cpp
engine/bp5/BP5Writer_TwoLevelShm.cpp
engine/bp5/BP5Writer_WithRerouting.cpp

engine/timeseries/TimeSeriesReader.cpp engine/timeseries/TimeSeriesReader.tcc

Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class BP5Engine
MACRO(DirectIO, Bool, bool, false) \
MACRO(DirectIOAlignOffset, UInt, unsigned int, 512) \
MACRO(DirectIOAlignBuffer, UInt, unsigned int, 0) \
MACRO(AggregationType, AggregationType, int, (int)AggregationType::TwoLevelShm) \
MACRO(AggregationType, AggregationType, int, (int)AggregationType::DataSizeBased) \
MACRO(EnableWriterRerouting, Bool, bool, true) \
MACRO(AsyncOpen, Bool, bool, true) \
MACRO(AsyncWrite, AsyncWrite, int, (int)AsyncWrite::Sync) \
MACRO(GrowthFactor, Float, float, DefaultBufferGrowthFactor) \
Expand Down
195 changes: 110 additions & 85 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ helper::RankPartition BP5Writer::GetPartitionInfo(const uint64_t rankDataSize, c

m_Profiler.AddTimerWatch("PartitionRanks");
m_Profiler.Start("PartitionRanks");
helper::Partitioning partitioning = helper::PartitionRanks(allsizes, numPartitions);
m_Partitioning = helper::PartitionRanks(allsizes, numPartitions);
m_Profiler.Stop("PartitionRanks");

if (parentRank == 0 && m_Parameters.verbose > 0)
{
partitioning.PrintSummary();
m_Partitioning.PrintSummary();
}

return partitioning.FindPartition(parentRank);
return m_Partitioning.FindPartition(parentRank);
}

StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds)
Expand Down Expand Up @@ -390,9 +390,32 @@ void BP5Writer::WriteData(format::BufferV *Data)
WriteData_EveryoneWrites(Data, false);
break;
case (int)AggregationType::EveryoneWritesSerial:
case (int)AggregationType::DataSizeBased:
WriteData_EveryoneWrites(Data, true);
break;
case (int)AggregationType::DataSizeBased:
// First initialize aggregator and transports if we haven't done it yet this step
if (!m_AggregatorInitializedThisStep)
{
// We can't allow ranks to change subfiles between calls to Put(), so we only
// do this initialization once per timestep. Consequently, partition decision
// could be based on incomplete step data.
InitAggregator(Data->Size());
InitTransports();
m_AggregatorInitializedThisStep = true;
}

// For rerouting to be useful, there must be multiple writers sending
// data to multiple subfiles.
if (m_Parameters.EnableWriterRerouting && m_Comm.Size() > 1 &&
m_Aggregator->m_SubStreams > 1)
{
WriteData_WithRerouting(Data);
}
else
{
WriteData_EveryoneWrites(Data, true);
}
break;
case (int)AggregationType::TwoLevelShm:
WriteData_TwoLevelShm(Data);
break;
Expand All @@ -410,19 +433,6 @@ void BP5Writer::WriteData(format::BufferV *Data)

void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, bool SerializedWriters)
{
if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased)
{
if (!m_AggregatorInitializedThisStep)
{
// We can't allow ranks to change subfiles between calls to Put(), so we only
// do this initialization once per timestep. Consequently, partition decision
// could be based on incomplete step data.
InitAggregator(Data->Size());
InitTransports();
m_AggregatorInitializedThisStep = true;
}
}

const aggregator::MPIChain *a = dynamic_cast<aggregator::MPIChain *>(m_Aggregator);

// new step writing starts at offset m_DataPos on aggregator
Expand Down Expand Up @@ -1020,6 +1030,33 @@ void BP5Writer::EndStep()

m_Profiler.Stop("ES_AWD");

if (m_Parameters.verbose > 2)
{
std::cout << "Rank " << m_Comm.Rank() << " deciding whether new writer map is needed"
<< std::endl;
std::cout << " m_WriterStep: " << m_WriterStep << std::endl;
std::cout << " m_AppendWriterCount: " << m_AppendWriterCount
<< ", m_Comm.Size(): " << m_Comm.Size() << std::endl;
std::cout << " m_AppendAggregatorCount: " << m_AppendAggregatorCount
<< ", m_Aggregator->m_NumAggregators: " << m_Aggregator->m_NumAggregators
<< std::endl;
std::cout << " m_AppendSubfileCount: " << m_AppendSubfileCount
<< ", m_Aggregator->m_SubStreams: " << m_Aggregator->m_SubStreams << std::endl;
}

if (!m_WriterStep || m_AppendWriterCount != static_cast<unsigned int>(m_Comm.Size()) ||
m_AppendAggregatorCount != static_cast<unsigned int>(m_Aggregator->m_NumAggregators) ||
m_AppendSubfileCount != static_cast<unsigned int>(m_Aggregator->m_SubStreams))
{
// new Writer Map is needed
if (m_Parameters.verbose > 2)
{
std::cout << "Rank " << m_Comm.Rank() << " new writer map needed" << std::endl;
}
const uint64_t a = static_cast<uint64_t>(m_Aggregator->m_SubStreamIndex);
m_WriterSubfileMap = m_Comm.GatherValues(a, 0);
}

if (m_Parameters.UseSelectiveMetadataAggregation)
{
SelectiveAggregationMetadata(TSInfo);
Expand Down Expand Up @@ -1609,29 +1646,9 @@ void BP5Writer::InitMetadataTransports()

void BP5Writer::InitTransports()
{
std::string cacheKey = GetCacheKey(m_Aggregator);
auto search = m_AggregatorSpecifics.find(cacheKey);
bool cacheHit = false;
OpenSubfile();

if (search != m_AggregatorSpecifics.end())
{
if (m_Parameters.verbose > 2)
{
std::cout << "Rank " << m_Comm.Rank() << " cache hit for aggregator key " << cacheKey
<< std::endl;
}
cacheHit = true;
}
else
{
// Didn't have one in the cache, add it now
m_AggregatorSpecifics.emplace(std::make_pair(cacheKey, AggTransportData(m_IO, m_Comm)));
}

AggTransportData &aggData = m_AggregatorSpecifics.at(cacheKey);

// /path/name.bp.dir/name.bp.rank
aggData.m_SubStreamNames = GetBPSubStreamNames({m_Name}, m_Aggregator->m_SubStreamIndex);
AggTransportData &aggData = m_AggregatorSpecifics.at(GetCacheKey(m_Aggregator));

if (m_IAmDraining)
{
Expand All @@ -1658,9 +1675,49 @@ void BP5Writer::InitTransports()
aggData.m_DrainSubStreamNames, m_IO.m_TransportsParameters, m_Parameters.NodeLocal);
}

if (m_IAmDraining)
{
if (m_DrainBB)
{
for (const auto &name : aggData.m_DrainSubStreamNames)
{
m_FileDrainer.AddOperationOpen(name, m_OpenMode);
}
}
}

InitBPBuffer();
}

void BP5Writer::OpenSubfile(bool useComm, bool forceAppend)
{
std::string cacheKey = GetCacheKey(m_Aggregator);
auto search = m_AggregatorSpecifics.find(cacheKey);
bool cacheHit = false;

if (search != m_AggregatorSpecifics.end())
{
if (m_Parameters.verbose > 2)
{
std::cout << "Rank " << m_Comm.Rank() << " cache hit for aggregator key " << cacheKey
<< std::endl;
}
cacheHit = true;
}
else
{
// Didn't have one in the cache, add it now
m_AggregatorSpecifics.emplace(std::make_pair(cacheKey, AggTransportData(m_IO, m_Comm)));
}

AggTransportData &aggData = m_AggregatorSpecifics.at(cacheKey);

// /path/name.bp.dir/name.bp.rank
aggData.m_SubStreamNames = GetBPSubStreamNames({m_Name}, m_Aggregator->m_SubStreamIndex);

helper::Comm openSyncComm;

if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased)
if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased && useComm)
{
// Split my writer chain so only ranks that actually need to open a
// file can do so in an ordered fashion.
Expand All @@ -1679,7 +1736,7 @@ void BP5Writer::InitTransports()

if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased)
{
if (m_WriterStep > 0)
if (m_WriterStep > 0 || forceAppend)
{
// override the mode to be append if we're opening a file that
// was already opened by another rank.
Expand All @@ -1691,29 +1748,24 @@ void BP5Writer::InitTransports()
{
std::cout << "Rank " << m_Comm.Rank() << " opening data file" << std::endl;
}
aggData.m_FileDataManager.OpenFiles(aggData.m_SubStreamNames, mode,
m_IO.m_TransportsParameters, true,
*DataWritingComm);
if (useComm)
{
aggData.m_FileDataManager.OpenFiles(aggData.m_SubStreamNames, mode,
m_IO.m_TransportsParameters, true,
*DataWritingComm);
}
else
{
aggData.m_FileDataManager.OpenFiles(aggData.m_SubStreamNames, mode,
m_IO.m_TransportsParameters, true);
}
}
}

if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased)
if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased && useComm)
{
openSyncComm.Free();
}

if (m_IAmDraining)
{
if (m_DrainBB)
{
for (const auto &name : aggData.m_DrainSubStreamNames)
{
m_FileDrainer.AddOperationOpen(name, m_OpenMode);
}
}
}

this->InitBPBuffer();
}

/*generate the header for the metadata index file*/
Expand Down Expand Up @@ -1963,33 +2015,6 @@ void BP5Writer::InitBPBuffer()
{
m_WriterDataPos.resize(m_Comm.Size());
}

if (m_Parameters.verbose > 2)
{
std::cout << "Rank " << m_Comm.Rank() << " deciding whether new writer map is needed"
<< std::endl;
std::cout << " m_WriterStep: " << m_WriterStep << std::endl;
std::cout << " m_AppendWriterCount: " << m_AppendWriterCount
<< ", m_Comm.Size(): " << m_Comm.Size() << std::endl;
std::cout << " m_AppendAggregatorCount: " << m_AppendAggregatorCount
<< ", m_Aggregator->m_NumAggregators: " << m_Aggregator->m_NumAggregators
<< std::endl;
std::cout << " m_AppendSubfileCount: " << m_AppendSubfileCount
<< ", m_Aggregator->m_SubStreams: " << m_Aggregator->m_SubStreams << std::endl;
}

if (!m_WriterStep || m_AppendWriterCount != static_cast<unsigned int>(m_Comm.Size()) ||
m_AppendAggregatorCount != static_cast<unsigned int>(m_Aggregator->m_NumAggregators) ||
m_AppendSubfileCount != static_cast<unsigned int>(m_Aggregator->m_SubStreams))
{
// new Writer Map is needed, generate now, write later
if (m_Parameters.verbose > 2)
{
std::cout << "Rank " << m_Comm.Rank() << " new writer map needed" << std::endl;
}
const uint64_t a = static_cast<uint64_t>(m_Aggregator->m_SubStreamIndex);
m_WriterSubfileMap = m_Comm.GatherValues(a, 0);
}
}

void BP5Writer::EnterComputationBlock() noexcept
Expand Down
14 changes: 14 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class BP5Writer : public BP5Engine, public core::Engine
std::map<std::string, AggTransportData> m_AggregatorSpecifics;
helper::RankPartition GetPartitionInfo(const uint64_t rankDataSize, const int subStreams,
helper::Comm const &parentComm);
void OpenSubfile(const bool useComm = true, const bool forceAppend = false);
helper::Partitioning m_Partitioning;

/** Single object controlling BP buffering */
format::BP5Serializer m_BP5Serializer;
Expand Down Expand Up @@ -196,6 +198,7 @@ class BP5Writer : public BP5Engine, public core::Engine
/** Write Data to disk, in an aggregator chain */
void WriteData(format::BufferV *Data);
void WriteData_EveryoneWrites(format::BufferV *Data, bool SerializedWriters);
void WriteData_WithRerouting(format::BufferV *Data);
void WriteData_EveryoneWrites_Async(format::BufferV *Data, bool SerializedWriters);
void WriteData_TwoLevelShm(format::BufferV *Data);
void WriteData_TwoLevelShm_Async(format::BufferV *Data);
Expand Down Expand Up @@ -302,6 +305,17 @@ class BP5Writer : public BP5Engine, public core::Engine
*/
uint64_t CountStepsInMetadataIndex(format::BufferSTL &bufferSTL);

// Thread function for inter-rank communication when rerouting aggregation
// is enabled
void ReroutingCommunicationLoop();
int m_TargetIndex;
int m_TargetCoordinator;
std::mutex m_WriteMutex;
std::mutex m_NotifMutex;
std::condition_variable m_WriteCV;
bool m_ReadyToWrite;
bool m_FinishedWriting;

/* Async write's future */
std::future<int> m_WriteFuture;
// variables to delay writing to index file
Expand Down
Loading
Loading