Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,48 @@
globus:
globus_endpoints:

# 7.0.1.1 ENDPOINTS

nersc7011_alsdev:
root_path: /global/cfs/cdirs/als/data_mover/7.0.1.1/
uri: nersc.gov
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
name: nersc7011_alsdev

nersc7011_alsdev_raw:
root_path: /global/cfs/cdirs/als/data_mover/7.0.1.1/raw
uri: nersc.gov
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
name: nersc7011_alsdev_raw

qnap7011:
root_path: /
uri: qnap7011.lbl.gov
uuid: tbd
name: qnap7011

qnap7011_raw:
root_path: /data/raw
uri: qnap7011.lbl.gov
uuid: tbd
name: qnap7011_raw

# 7.0.1.2 ENDPOINTS

nersc7012:
root_path: /global/cfs/cdirs/als/gsharing/data_mover/7012
uri: nersc.gov
uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3
name: nersc7012

data7012:
root_path: /
uri: hpc.lbl.gov
uuid: 639c49be-604f-423c-9c5d-82a53afe1bf1
name: data7012

# 8.3.2 ENDPOINTS

spot832:
root_path: /
uri: spot832.lbl.gov
Expand Down Expand Up @@ -90,18 +133,6 @@ globus:
uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3
name: nersc832

nersc7012:
root_path: /global/cfs/cdirs/als/gsharing/data_mover/7012
uri: nersc.gov
uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3
name: nersc7012

data7012:
root_path: /
uri: hpc.lbl.gov
uuid: 639c49be-604f-423c-9c5d-82a53afe1bf1
name: data7012

globus_apps:
als_transfer:
client_id: ${GLOBUS_CLIENT_ID}
Expand Down
28 changes: 28 additions & 0 deletions create_deployments_7011.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export $(grep -v '^#' .env | xargs)


# Create work pools. If a work pool already exists, it will throw a warning but that's no problem
prefect work-pool create 'dispatcher_7011_flow_pool'
prefect work-pool create 'new_file_7011_flow_pool'
# prefect work-pool create 'new_file_7011_prune_pool'

# dispatcher_7011_flow_pool
# in docker-compose.yaml:
# command: prefect agent start --pool "dispatcher_7011_flow_pool"
prefect deployment build ./orchestration/flows/bl7011/dispatcher.py:dispatcher -n run_733_dispatcher -p dispatcher_pool -q dispatcher_733_queue
prefect deployment apply dispatcher-deployment.yaml

# new_file_7011_flow_pool
# in docker-compose.yaml:
# command: prefect agent start --pool "new_file_7011_flow_pool"
prefect deployment build ./orchestration/flows/bl7011/move.py:process_new_7011_file -n new_file_7011 -p new_file_7011_flow_pool -q new_file_7011_queue
prefect deployment apply process_new_7011_file-deployment.yaml


# TODO: Wait for PR #62 to be merged and use the new prune_controller
# new_file_7011_prune_pool
# in docker-compose.yaml:
# command: prefect agent start --pool "new_file_7011_prune_pool"

# prefect deployment build ./orchestration/flows/bl7011/prune.py:prune_data7011 -n prune_data7011 -p new_file_7011_prune_pool -q prune_data7011_queue
# prefect deployment apply prune_data7011-deployment.yaml
40 changes: 40 additions & 0 deletions docs/mkdocs/docs/bl7011.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Beamline 7.0.1.1 Flows

This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.0.1.1 (COSMIC Scattering)](https://als.lbl.gov/beamlines/7-0-1-1/).

## Data at 7.0.1.1

At Beamline 7.0.1.1, users generate data in an HDF5 format containing a background subtracted stack of 2D images with associated Labview metadata. Depending on the experiment, the file sizes can be greater than 100GB. A ROI is exported for each dataset.

## File Watcher

There is a file watcher on the system `QNAP` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps:
- Copy scans in real time to `NERSC CFS` using Globus Transfer.
- Copy project data to `NERSC HPSS` for long-term storage.
- Analysis on HPC systems (TBD).
- Schedule data pruning from `QNAP` and `NERSC CFS`.


## Prefect Configuration

### Registered Flows

#### `dispatcher.py`

The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code).

#### `move.py`

Flow to process a new file at BL 7.0.1.1
1. Copy the file from the QNAP to NERSC CFS. Ingest file path in SciCat.
2. Schedule pruning from QNAP.
3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat.
4. Schedule pruning from NERSC CFS.


## VM Details

The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff.

**Name**: `flow-xpcs`
**OS**: `Ubuntu 20.02 LTS` ... **must be updated to `Ubuntu 24.04 LTS`**
6 changes: 4 additions & 2 deletions docs/mkdocs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ nav:
- Home: index.md
- Installation and Requirements: install.md
- Getting Started: getting_started.md
- Compute at ALCF: alcf832.md
- Compute at NERSC: nersc832.md
- Beamline Implementations:
- 7.3.3 COSMIC Scattering: bl7011.md
- 8.3.2 Micro Tomography - Compute at ALCF: alcf832.md
- 8.3.2 Micro Tomography - Compute at NERSC: nersc832.md
- Orchestration: orchestration.md
- Configuration: configuration.md
# - Troubleshooting: troubleshooting.md
Expand Down
146 changes: 112 additions & 34 deletions orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,117 @@ def __init__(self, root_path, uuid_value=None):
self.uri = f"mock_endpoint_uri_{self.uuid}"


# Mock the Client class to avoid real network calls
class MockGlobusComputeClient:
def __init__(self, *args, **kwargs):
# No real initialization, as this is a mock
pass

def version_check(self):
# Mock version check to do nothing
pass

def run(self, *args, **kwargs):
# Return a mock task ID
return "mock_task_id"

def get_task(self, task_id):
# Return a mock task response
return {
"pending": False,
"status": "success",
"result": "mock_result"
}

def get_result(self, task_id):
# Return a mock result
return "mock_result"


class MockSecret:
value = str(uuid4())


# ----------------------------
# Tests for 7011
# ----------------------------

class MockConfig7011:
def __init__(self) -> None:
"""
Dummy configuration for 7011 flows.
"""
# Create mock endpoints
self.endpoints = {
"data7011_raw": MockEndpoint(root_path="mock_data7011_raw_path", uuid_value=str(uuid4())),
"nersc7011_alsdev_raw": MockEndpoint(root_path="mock_nersc7011_alsdev_raw_path", uuid_value=str(uuid4())),
}

# Define mock apps
self.apps = {
"als_transfer": "mock_als_transfer_app"
}

# Use the mock transfer client instead of the real TransferClient
self.tc = MockTransferClient()

# Set attributes for easy access
self.data7011_raw = self.endpoints["data7011_raw"]
self.nersc7011_alsdev_raw = self.endpoints["nersc7011_alsdev_raw"]


def test_process_new_7011_file(mocker: MockFixture) -> None:
"""
Test the process_new_7011_file flow from orchestration.flows.bl7011.move.

This test verifies that:
- The get_transfer_controller function is called (patched) with the correct parameters.
- The returned transfer controller's copy method is called with the expected file path,
source, and destination endpoints from the provided configuration.

Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the flow to test.
from orchestration.flows.bl7011.move import process_new_7011_file

# Patch the Secret.load and init_transfer_client in the configuration context.
with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()):
mocker.patch(
"orchestration.flows.bl7011.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
)
from orchestration.flows.bl7011.config import Config7011

# Instantiate the dummy configuration.
mock_config = Config7011()

# Generate a test file path.
test_file_path = f"/tmp/test_file_{uuid4()}.txt"

# Create a mock transfer controller with a mocked 'copy' method.
mock_transfer_controller = mocker.MagicMock()
mock_transfer_controller.copy.return_value = True

# Patch get_transfer_controller where it is used in process_new_733_file.
mocker.patch(
"orchestration.flows.bl7011.move.get_transfer_controller",
return_value=mock_transfer_controller
)

# Execute the flow with the test file path and dummy configuration.
result = process_new_7011_file(file_path=test_file_path, config=mock_config)

# Verify that the transfer controller's copy method was called exactly once.
assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once"
assert result is None, "The flow should return None"


# ----------------------------
# Tests for 832
# ----------------------------


class MockConfig832():
def __init__(self) -> None:
# Mock configuration
Expand Down Expand Up @@ -94,39 +205,10 @@ def __init__(self) -> None:
self.scicat = config["scicat"]


# Mock the Client class to avoid real network calls
class MockGlobusComputeClient:
def __init__(self, *args, **kwargs):
# No real initialization, as this is a mock
pass

def version_check(self):
# Mock version check to do nothing
pass

def run(self, *args, **kwargs):
# Return a mock task ID
return "mock_task_id"

def get_task(self, task_id):
# Return a mock task response
return {
"pending": False,
"status": "success",
"result": "mock_result"
}

def get_result(self, task_id):
# Return a mock result
return "mock_result"


def test_832_dispatcher(mocker: MockFixture):
"""Test 832 uber decision flow."""

# Mock the Secret block load using a simple manual mock class
class MockSecret:
value = str(uuid4())

mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())

Expand Down Expand Up @@ -174,11 +256,7 @@ def test_alcf_recon_flow(mocker: MockFixture):
Case 4) data832->ALCF transfer fails => raises ValueError("Transfer to ALCF Failed")
"""

# 1) Patch Secret.load(...) so HPC calls won't blow up from malformed UUID
mock_secret = mocker.MagicMock()
mock_secret.get.return_value = str(uuid4())

with mocker.patch('prefect.blocks.system.Secret.load', return_value=mock_secret):
with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()):
# 2) Patch out the calls in Config832 that do real Globus auth:
# a) init_transfer_client(...) used in the constructor
mocker.patch(
Expand Down
Empty file.
14 changes: 14 additions & 0 deletions orchestration/flows/bl7011/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from globus_sdk import TransferClient
from orchestration.globus import transfer


# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged)
class Config7011:
def __init__(self) -> None:
config = transfer.get_config()
self.endpoints = transfer.build_endpoints(config)
self.apps = transfer.build_apps(config)
self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"])
self.qnap7011 = self.endpoints["qnap7011"]
self.qnap7011_raw = self.endpoints["qnap7011_raw"]
self.nersc7011_alsdev_raw = self.endpoints["nersc7011_alsdev_raw"]
56 changes: 56 additions & 0 deletions orchestration/flows/bl7011/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
from prefect import flow
from typing import Optional, Union, Any

from orchestration.flows.bl7011.move import process_new_7011_file

logger = logging.getLogger(__name__)


@flow(name="dispatcher")
def dispatcher(
file_path: Optional[str] = None,
is_export_control: bool = False,
config: Optional[Union[dict, Any]] = None,
) -> None:
"""
Dispatcher flow for BL733 beamline that launches the new_733_file_flow.

:param file_path: Path to the file to be processed.
:param is_export_control: Flag indicating if export control measures should be applied.
(Not used in the current BL733 processing)
:param config: Configuration settings for processing.
Expected to be an instance of Config733 or a dict that can be converted.
:raises ValueError: If no configuration is provided.
:raises TypeError: If the provided configuration is not a dict or Config733.
"""

logger.info("Starting dispatcher flow for BL 7.0.1.1")
logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}")

# Validate inputs.
if file_path is None:
logger.error("No file_path provided to dispatcher.")
raise ValueError("File path is required for processing.")

if is_export_control:
logger.error("Data is under export control. Processing is not allowed.")
raise ValueError("Data is under export control. Processing is not allowed.")

if config is None:
logger.error("No configuration provided to dispatcher.")
raise ValueError("Configuration (config) is required for processing.")

try:
process_new_7011_file(
file_path=file_path,
config=config
)
logger.info("Dispatcher flow completed successfully.")
except Exception as e:
logger.error(f"Error during processing in dispatcher flow: {e}")
raise


if __name__ == "__main__":
dispatcher()
Loading