Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@
"DLSXRayCentering = dlstbx.services.xray_centering:DLSXRayCentering",
"CloudStats = dlstbx.services.cloudstats:CloudStats",
"CloudWatcher = dlstbx.services.cloudwatcher:CloudWatcher",
"S3EchoCollector = dlstbx.services.s3echo_collector:S3EchoCollector",
"S3EchoUploader = dlstbx.services.s3echo_uploader:S3EchoUploader",
"S3EchoWatcher = dlstbx.services.s3echo_watcher:S3EchoWatcher",
"SSXPlotter = dlstbx.services.ssx_plotter:SSXPlotter",
# "LoadProducer = dlstbx.services.load_producer:LoadProducer", # tentatively disabled
# "LoadReceiver = dlstbx.services.load_receiver:LoadReceiver", # tentatively disabled
Expand Down
78 changes: 41 additions & 37 deletions src/dlstbx/ispybtbx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,7 @@ def ispyb_filter(
)
parameters["ispyb_space_group"] = ""
parameters["ispyb_related_sweeps"] = []
parameters["ispyb_related_images"] = []
parameters["ispyb_reference_geometry"] = None
if visit_directory:
parameters["ispyb_pdb"] = i.get_linked_pdb_files_for_dcid(
Expand Down Expand Up @@ -940,16 +941,54 @@ def ispyb_filter(
# if a sample is linked to the dc, then get dcids on the same sample
sample_id = parameters["ispyb_dc_info"].get("BLSAMPLEID")
related_dcids = i.get_sample_dcids(sample_id, session)
elif dcid := parameters.get("ispyb_dcid"):
elif dc_id:
# else get dcids collected into the same image directory
related_dcids = i.get_related_dcids_same_directory(dcid, session)
related_dcids = i.get_related_dcids_same_directory(dc_id, session)
if related_dcids:
parameters["ispyb_related_dcids"].append(related_dcids)
logger.debug(f"ispyb_related_dcids: {parameters['ispyb_related_dcids']}")
parameters["ispyb_dcg_dcids"] = i.get_dcg_dcids(
dc_info.get("dataCollectionId"), dc_info.get("dataCollectionGroupId"), session
)

# for the moment we do not want multi-xia2 for /dls/mx i.e. VMXi
# beware if other projects start using this directory structure will
# need to be smarter here...

# Handle related DCID properties via DataCollectionGroup, if there is one
if dcg_id:
stmt = select(
models.DataCollection.dataCollectionId,
models.DataCollection.startImageNumber,
models.DataCollection.numberOfImages,
models.DataCollection.overlap,
models.DataCollection.axisRange,
models.DataCollection.fileTemplate,
models.DataCollection.imageDirectory,
).where(models.DataCollection.dataCollectionGroupId == dcg_id)

related_images = []

for dc in session.execute(stmt).mappings():
start, end = i.dc_info_to_start_end(dc)
parameters["ispyb_related_sweeps"].append((dc.dataCollectionId, start, end))
parameters["ispyb_related_images"].append(
(dc.dataCollectionId, i.dc_info_to_filename(dc))
)

# We don't get related images for /dls/mx collections
if (
not parameters["ispyb_image_directory"].startswith("/dls/mx")
and dc.dataCollectionId != dc_id
and i.dc_info_is_rotation_scan(dc)
):
related_images.append(
"%s:%d:%d" % (i.dc_info_to_filename(dc), start, end)
)

if not parameters.get("ispyb_images"):
parameters["ispyb_images"] = ",".join(related_images)

pin_info = i.get_pin_info_from_sample_id(
parameters["ispyb_dc_info"].get("BLSAMPLEID"), session
)
Expand Down Expand Up @@ -988,41 +1027,6 @@ def ispyb_filter(
parameters["ispyb_ssx_events"] = events or None
parameters["ispyb_ssx_shots_per_image"] = shots_per_image or None

# for the moment we do not want multi-xia2 for /dls/mx i.e. VMXi
# beware if other projects start using this directory structure will
# need to be smarter here...

# Handle related DCID properties via DataCollectionGroup, if there is one
if dcg_id:
stmt = select(
models.DataCollection.dataCollectionId,
models.DataCollection.startImageNumber,
models.DataCollection.numberOfImages,
models.DataCollection.overlap,
models.DataCollection.axisRange,
models.DataCollection.fileTemplate,
models.DataCollection.imageDirectory,
).where(models.DataCollection.dataCollectionGroupId == dcg_id)

related_images = []

for dc in session.execute(stmt).mappings():
start, end = i.dc_info_to_start_end(dc)
parameters["ispyb_related_sweeps"].append((dc.dataCollectionId, start, end))

# We don't get related images for /dls/mx collections
if (
not parameters["ispyb_image_directory"].startswith("/dls/mx")
and dc.dataCollectionId != dc_id
and i.dc_info_is_rotation_scan(dc)
):
related_images.append(
"%s:%d:%d" % (i.dc_info_to_filename(dc), start, end)
)

if not parameters.get("ispyb_images"):
parameters["ispyb_images"] = ",".join(related_images)

return message, parameters


Expand Down
67 changes: 44 additions & 23 deletions src/dlstbx/services/mimas.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,29 @@ def get_cloudbursting_spec(
# Create cloud specification entry for each element in zocalo.mimas.cloud
# Add specification to the list if science cluster if oversubscribed
# and cluster statistics are up-to-date
is_iris_live = (
self.cluster_stats["iris"]["last_cluster_update"] > timeout_threshold
)
is_s3echo_live = (
self.cluster_stats["s3echo"]["last_cluster_update"] > timeout_threshold
)
is_s3echo_quota = self.cluster_stats["s3echo"]["total"] < s3echo_quota
cloudbursting = is_iris_live and is_s3echo_live and is_s3echo_quota
self.log.debug(
pformat(
{
"is_iris_live": is_iris_live,
"is_s3echo_live": is_s3echo_live,
"is_s3echo_quota": is_s3echo_quota,
}
)
)
if not cloudbursting:
return cloud_spec_list

# Create cloud specification entry for each element in zocalo.mimas.cloud
# Add specification to the list if science cluster if oversubscribed
# and cluster statistics are up-to-date
for group in self.config.storage.get("zocalo.mimas.cloud", []):
if not group.get("cloudbursting", True):
continue
Expand All @@ -240,30 +263,28 @@ def get_cloudbursting_spec(
beamlines=set(group.get("beamlines", []))
)
group_max_jobs_waiting = group.get("max_jobs_waiting", max_jobs_waiting)
if (
(
(
self.cluster_stats["slurm"]["jobs_waiting"]
> group_max_jobs_waiting["slurm"]
)
or (
self.cluster_stats["slurm"]["last_cluster_update"]
< timeout_threshold
)
)
and (
self.cluster_stats["iris"]["jobs_waiting"]
< group_max_jobs_waiting["iris"]
)
and (
self.cluster_stats["iris"]["last_cluster_update"]
> timeout_threshold
)
and (
self.cluster_stats["s3echo"]["last_cluster_update"]
> timeout_threshold
is_slurm_max_jobs = (
self.cluster_stats["slurm"]["jobs_waiting"]
> group_max_jobs_waiting["slurm"]
)
is_slurm_timeout = (
self.cluster_stats["slurm"]["last_cluster_update"]
< timeout_threshold
)
is_iris_max_jobs = (
self.cluster_stats["iris"]["jobs_waiting"]
< group_max_jobs_waiting["iris"]
)
self.log.debug(
pformat(
{
"is_slurm_max_jobs": is_slurm_max_jobs,
"is_slurm_timeout": is_slurm_timeout,
"is_iris_max_jobs": is_iris_max_jobs,
}
)
):
)
if (is_slurm_max_jobs or is_slurm_timeout) and is_iris_max_jobs:
cloud_spec_list.append(
{
"cloud_spec": cloud_spec,
Expand Down
151 changes: 151 additions & 0 deletions src/dlstbx/services/s3echo_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from __future__ import annotations

import minio
import workflows.recipe
from workflows.services.common_service import CommonService

from dlstbx.util import iris
from dlstbx.util.iris import get_minio_client, update_dcid_info_file


class S3EchoCollector(CommonService):
"""
A service that keeps status of uploads to S3 Echo object store and does garbage collection of unreferenced data.
"""

# Human readable service name
_service_name = "S3EchoCollector"

# Logger name
_logger_name = "dlstbx.services.s3echocollector"

# STFC S3 Echo credentials
_s3echo_credentials = "/dls_sw/apps/zocalo/secrets/credentials-echo-mx.cfg"

def initializing(self):
"""
Register callback functions to upload and download data from S3 Echo object store.
"""
self.log.info(f"{S3EchoCollector._service_name} starting")

self.minio_client: minio.Minio = get_minio_client(
S3EchoCollector._s3echo_credentials
)

self._message_delay = 5

workflows.recipe.wrap_subscribe(
self._transport,
"s3echo.start",
self.on_start,
acknowledgement=True,
log_extender=self.extend_log,
)

workflows.recipe.wrap_subscribe(
self._transport,
"s3echo.end",
self.on_end,
acknowledgement=True,
log_extender=self.extend_log,
)

def on_start(self, rw, header, message):
"""
Process request for uploading images to S3 Echo object store.
"""
# Conditionally acknowledge receipt of the message
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
rw.transport.ack(header, transaction=txn)

params = rw.recipe_step["parameters"]
minio_client = get_minio_client(S3EchoCollector._s3echo_credentials)

bucket_name = params["bucket"]
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
rpid = int(params["rpid"])

s3echo_upload_files = {}
if images := params.get("images"):
dcid = int(params["dcid"])
response_info = update_dcid_info_file(
minio_client, bucket_name, dcid, 0, rpid, self.log
)
try:
image_files = iris.get_image_files(images, self.log)
s3echo_upload_files.update(
{name: (dcid, pth) for name, pth in image_files.items()}
)
except Exception:
self.log.exception("Error uploading image files to S3 Echo")
if not response_info:
self.log.debug("Sending message to upload endpoint")
rw.send_to(
"upload", {"s3echo_upload": {dcid: image_files}}, transaction=txn
)
rw.environment.update({"s3echo_upload": s3echo_upload_files})
self.log.debug("Sending message to watch endpoint")
rw.send_to("watch", message, transaction=txn)
elif params.get("related_images"):
for dcid, image_master_file in params.get("related_images"):
response_info = update_dcid_info_file(
minio_client, bucket_name, dcid, 0, rpid, self.log
)
try:
image_files = iris.get_related_images_files_from_h5(
image_master_file, self.log
)
s3echo_upload_files.update(
{name: (dcid, pth) for name, pth in image_files.items()}
)
if not response_info:
self.log.debug("Sending message to upload endpoint")
rw.send_to(
"upload",
{"s3echo_upload": {dcid: image_files}},
transaction=txn,
)
except Exception:
self.log.exception("Error uploading image files to S3 Echo")
rw.environment.update({"s3echo_upload": s3echo_upload_files})
self.log.debug("Sending message to watch endpoint")
rw.send_to("watch", message, transaction=txn)
rw.transport.transaction_commit(txn)

def on_end(self, rw, header, message):
"""
Remove reference to image data in S3 Echo object store after end of processing.
"""
# Conditionally acknowledge receipt of the message
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
rw.transport.ack(header, transaction=txn)

params = rw.recipe_step["parameters"]
minio_client = get_minio_client(S3EchoCollector._s3echo_credentials)
bucket_name = params["bucket"]
rpid = int(params["rpid"])

for dcid, _ in params.get("related_images", [(int(params["dcid"]), None)]):
response_info = update_dcid_info_file(
minio_client, bucket_name, dcid, None, None, self.log
)
if not response_info:
self.log.warning(f"No {dcid}_info data read from the object store")
elif response_info["status"] == -1 or (
response_info["status"] == 1 and response_info["pid"] == [rpid]
):
dc_objects = {
obj.object_name
for obj in minio_client.list_objects(bucket_name)
if obj.object_name is not None
}
for obj_name in dc_objects:
if obj_name.startswith(f"{dcid}_"):
minio_client.remove_object(bucket_name, obj_name)
else:
update_dcid_info_file(
minio_client, bucket_name, dcid, None, -rpid, self.log
)

rw.transport.transaction_commit(txn)
Loading