Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions replay_testing/fixtures/nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
#

import base64
import os
import subprocess
from pathlib import Path
Expand All @@ -28,11 +27,6 @@
class NexusFixture(BaseFixture):
"""Fixture provider that downloads MCAP files from Nexus repository."""

NEXUS_CI_USERNAME = os.getenv('NEXUS_CI_USERNAME', '')
NEXUS_CI_PASSWORD = os.getenv('NEXUS_CI_PASSWORD', '')
NEXUS_SERVER = os.getenv('NEXUS_SERVER', '')
NEXUS_REPOSITORY = os.getenv('NEXUS_REPOSITORY', 'rosbag-hosted')

def __init__(self, path: str):
self.nexus_path = path

Expand All @@ -47,19 +41,17 @@ def download(self, destination_folder: Path) -> Mcap:
Mcap: A Mcap object with paths
to downloaded files
"""

if self.NEXUS_CI_USERNAME == 'ci':
decoded_password = base64.b64decode(self.NEXUS_CI_PASSWORD).decode().strip()
else:
decoded_password = self.NEXUS_CI_PASSWORD
_logger_.info(f'NEXUS_SERVER: {self.NEXUS_SERVER}')
_logger_.info(f'NEXUS_REPOSITORY: {self.NEXUS_REPOSITORY}')
# Read environment variables at runtime, not at class definition time
username = os.getenv('NEXUS_USERNAME', '')
password = os.getenv('NEXUS_PASSWORD', '')
server = os.getenv('NEXUS_SERVER', '')
repo = os.getenv('NEXUS_REPOSITORY', '')
extra_headers = os.getenv('NEXUS_EXTRA_HEADERS', '')
_logger_.info(f'NEXUS_SERVER: {server}')
_logger_.info(f'NEXUS_REPOSITORY: {repo}')
_logger_.info(f'NEXUS_USERNAME: {username}')
_logger_.info(f'Downloading {self.nexus_path} to {destination_folder}')

server = self.NEXUS_SERVER
repo = self.NEXUS_REPOSITORY
username = self.NEXUS_CI_USERNAME

nexus_filename = self.nexus_path.split('/')[-1]

curl_dest = destination_folder / nexus_filename
Expand All @@ -68,20 +60,47 @@ def download(self, destination_folder: Path) -> Mcap:
'curl',
'-v',
'-u',
f'{username}:{decoded_password}',
f'{username}:{password}',
'-sL',
'-o',
str(curl_dest),
'-w',
'%{http_code}',
f'{server}/repository/{repo}/{self.nexus_path}',
]

if extra_headers:
for header in extra_headers.split(';'):
header = header.strip()
if header:
curl_command.extend(['-H', header])

result = subprocess.run(curl_command, capture_output=True, text=True)

if result.returncode == 0:
_logger_.info(f'Download successful: {curl_dest}')
return Mcap(path=curl_dest)
else:
http_code = result.stdout.strip()

if result.returncode != 0:
_logger_.error(f'Download failed for {self.nexus_path}')
_logger_.error(f'STDOUT: {result.stdout}')
_logger_.error(f'HTTP status code: {http_code}')
_logger_.error(f'STDERR: {result.stderr}')
raise RuntimeError(f'Failed to download fixture from Nexus: {self.nexus_path}')

if not http_code.startswith('2'):
_logger_.error(f'Download failed for {self.nexus_path}')
_logger_.error(f'HTTP status code: {http_code}')
_logger_.error(f'STDERR: {result.stderr}')
raise RuntimeError(f'Failed to download fixture from Nexus: {self.nexus_path} (HTTP {http_code})')

# Verify the downloaded file is an MCAP by checking magic bytes
mcap_magic = b'\x89MCAP0\r\n'
with Path.open(curl_dest, 'rb') as f:
file_header = f.read(len(mcap_magic))
if file_header != mcap_magic:
_logger_.error(f'Downloaded file is not a valid MCAP: {curl_dest}')
_logger_.error(f'Expected magic bytes: {mcap_magic!r}, got: {file_header!r}')
raise RuntimeError(
f'Downloaded file is not a valid MCAP (possibly a Cloudflare challenge page): {self.nexus_path}'
)

_logger_.info(f'Download successful: {curl_dest} (HTTP {http_code})')
return Mcap(path=curl_dest)