Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ project(replay_testing)
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)

include(cmake/install_mcap.cmake)

# Install Python modules and entry points
ament_python_install_package(${PROJECT_NAME}
SCRIPTS_DESTINATION lib/${PROJECT_NAME}
Expand Down
48 changes: 0 additions & 48 deletions cmake/install_mcap.cmake

This file was deleted.

49 changes: 32 additions & 17 deletions replay_testing/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,38 @@
# limitations under the License.
#

import subprocess
import rosbag2_py


def filter_mcap(input, output, output_topics):
command = [
'mcap',
'filter',
input,
'-o',
output,
]

# Filter out topics that will be replayed
for topic in output_topics:
command.extend(['-n', topic])

try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f'Error filtering {input}: {e}')
"""Filter out specified topics from an mcap file.

Args:
input: Path to input mcap file
output: Path to output mcap file
output_topics: List of topic names to exclude from the output
"""
topics_to_exclude = set(output_topics)

reader = rosbag2_py.SequentialReader()
reader.open(
rosbag2_py.StorageOptions(uri=str(input), storage_id='mcap'),
rosbag2_py.ConverterOptions(input_serialization_format='cdr', output_serialization_format='cdr'),
)

writer = rosbag2_py.SequentialWriter()
writer.open(
rosbag2_py.StorageOptions(uri=output, storage_id='mcap'),
rosbag2_py.ConverterOptions(input_serialization_format='cdr', output_serialization_format='cdr'),
)

# Create topics in writer, excluding filtered ones
for topic_metadata in reader.get_all_topics_and_types():
if topic_metadata.name not in topics_to_exclude:
writer.create_topic(topic_metadata)

# Copy messages, excluding filtered topics
while reader.has_next():
topic_name, data, timestamp = reader.read_next()
if topic_name not in topics_to_exclude:
writer.write(topic_name, data, timestamp)
75 changes: 75 additions & 0 deletions test/test_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright (c) 2025-present Polymath Robotics, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pathlib import Path

import rosbag2_py

from replay_testing.filter import filter_mcap

FIXTURES_DIR = Path(__file__).parent / 'fixtures'
CMD_VEL_MCAP = FIXTURES_DIR / 'cmd_vel_only.mcap'


def get_mcap_topics(mcap_path: Path) -> list[str]:
"""Get list of topics from an mcap file."""
reader = rosbag2_py.SequentialReader()
reader.open(
rosbag2_py.StorageOptions(uri=str(mcap_path), storage_id='mcap'),
rosbag2_py.ConverterOptions(input_serialization_format='cdr', output_serialization_format='cdr'),
)
return [topic.name for topic in reader.get_all_topics_and_types()]


def test_filter_mcap_creates_output_file(tmp_path):
"""Test that filter_mcap creates an output file."""
output_path = tmp_path / 'filtered.mcap'

filter_mcap(
input=str(CMD_VEL_MCAP),
output=str(output_path),
output_topics=['/vehicle/cmd_vel'],
)

assert output_path.exists()


def test_filter_mcap_preserves_matching_topic(tmp_path):
"""Test that filter_mcap preserves messages matching the filter."""
output_path = tmp_path / 'filtered.mcap'

filter_mcap(
input=str(CMD_VEL_MCAP),
output=str(output_path),
output_topics=['/vehicle/cmd_vel'],
)

topics = get_mcap_topics(output_path)
assert '/vehicle/cmd_vel' not in topics


def test_filter_mcap_empty_topics_creates_empty_mcap(tmp_path):
"""Test that filtering with non-matching topic creates mcap with no messages."""
output_path = tmp_path / 'filtered.mcap'

filter_mcap(
input=str(CMD_VEL_MCAP),
output=str(output_path),
output_topics=['/nonexistent/topic'],
)

assert output_path.exists()
topics = get_mcap_topics(output_path)
assert '/vehicle/cmd_vel' in topics