diff --git a/setup.py b/setup.py index bd3a71d0e..42de87628 100644 --- a/setup.py +++ b/setup.py @@ -120,7 +120,9 @@ "DLSXRayCentering = dlstbx.services.xray_centering:DLSXRayCentering", "CloudStats = dlstbx.services.cloudstats:CloudStats", "CloudWatcher = dlstbx.services.cloudwatcher:CloudWatcher", + "S3EchoCollector = dlstbx.services.s3echo_collector:S3EchoCollector", "S3EchoUploader = dlstbx.services.s3echo_uploader:S3EchoUploader", + "S3EchoWatcher = dlstbx.services.s3echo_watcher:S3EchoWatcher", "SSXPlotter = dlstbx.services.ssx_plotter:SSXPlotter", # "LoadProducer = dlstbx.services.load_producer:LoadProducer", # tentatively disabled # "LoadReceiver = dlstbx.services.load_receiver:LoadReceiver", # tentatively disabled diff --git a/src/dlstbx/ispybtbx/__init__.py b/src/dlstbx/ispybtbx/__init__.py index 54c1f9efc..3ac27fe92 100644 --- a/src/dlstbx/ispybtbx/__init__.py +++ b/src/dlstbx/ispybtbx/__init__.py @@ -894,6 +894,7 @@ def ispyb_filter( ) parameters["ispyb_space_group"] = "" parameters["ispyb_related_sweeps"] = [] + parameters["ispyb_related_images"] = [] parameters["ispyb_reference_geometry"] = None if visit_directory: parameters["ispyb_pdb"] = i.get_linked_pdb_files_for_dcid( @@ -940,9 +941,9 @@ def ispyb_filter( # if a sample is linked to the dc, then get dcids on the same sample sample_id = parameters["ispyb_dc_info"].get("BLSAMPLEID") related_dcids = i.get_sample_dcids(sample_id, session) - elif dcid := parameters.get("ispyb_dcid"): + elif dc_id: # else get dcids collected into the same image directory - related_dcids = i.get_related_dcids_same_directory(dcid, session) + related_dcids = i.get_related_dcids_same_directory(dc_id, session) if related_dcids: parameters["ispyb_related_dcids"].append(related_dcids) logger.debug(f"ispyb_related_dcids: {parameters['ispyb_related_dcids']}") @@ -950,6 +951,44 @@ def ispyb_filter( dc_info.get("dataCollectionId"), dc_info.get("dataCollectionGroupId"), session ) + # for the moment we do not want multi-xia2 for /dls/mx i.e. VMXi + # beware if other projects start using this directory structure will + # need to be smarter here... + + # Handle related DCID properties via DataCollectionGroup, if there is one + if dcg_id: + stmt = select( + models.DataCollection.dataCollectionId, + models.DataCollection.startImageNumber, + models.DataCollection.numberOfImages, + models.DataCollection.overlap, + models.DataCollection.axisRange, + models.DataCollection.fileTemplate, + models.DataCollection.imageDirectory, + ).where(models.DataCollection.dataCollectionGroupId == dcg_id) + + related_images = [] + + for dc in session.execute(stmt).mappings(): + start, end = i.dc_info_to_start_end(dc) + parameters["ispyb_related_sweeps"].append((dc.dataCollectionId, start, end)) + parameters["ispyb_related_images"].append( + (dc.dataCollectionId, i.dc_info_to_filename(dc)) + ) + + # We don't get related images for /dls/mx collections + if ( + not parameters["ispyb_image_directory"].startswith("/dls/mx") + and dc.dataCollectionId != dc_id + and i.dc_info_is_rotation_scan(dc) + ): + related_images.append( + "%s:%d:%d" % (i.dc_info_to_filename(dc), start, end) + ) + + if not parameters.get("ispyb_images"): + parameters["ispyb_images"] = ",".join(related_images) + pin_info = i.get_pin_info_from_sample_id( parameters["ispyb_dc_info"].get("BLSAMPLEID"), session ) @@ -988,41 +1027,6 @@ def ispyb_filter( parameters["ispyb_ssx_events"] = events or None parameters["ispyb_ssx_shots_per_image"] = shots_per_image or None - # for the moment we do not want multi-xia2 for /dls/mx i.e. VMXi - # beware if other projects start using this directory structure will - # need to be smarter here... - - # Handle related DCID properties via DataCollectionGroup, if there is one - if dcg_id: - stmt = select( - models.DataCollection.dataCollectionId, - models.DataCollection.startImageNumber, - models.DataCollection.numberOfImages, - models.DataCollection.overlap, - models.DataCollection.axisRange, - models.DataCollection.fileTemplate, - models.DataCollection.imageDirectory, - ).where(models.DataCollection.dataCollectionGroupId == dcg_id) - - related_images = [] - - for dc in session.execute(stmt).mappings(): - start, end = i.dc_info_to_start_end(dc) - parameters["ispyb_related_sweeps"].append((dc.dataCollectionId, start, end)) - - # We don't get related images for /dls/mx collections - if ( - not parameters["ispyb_image_directory"].startswith("/dls/mx") - and dc.dataCollectionId != dc_id - and i.dc_info_is_rotation_scan(dc) - ): - related_images.append( - "%s:%d:%d" % (i.dc_info_to_filename(dc), start, end) - ) - - if not parameters.get("ispyb_images"): - parameters["ispyb_images"] = ",".join(related_images) - return message, parameters diff --git a/src/dlstbx/services/mimas.py b/src/dlstbx/services/mimas.py index 9f8eb8180..e99c562da 100644 --- a/src/dlstbx/services/mimas.py +++ b/src/dlstbx/services/mimas.py @@ -231,6 +231,29 @@ def get_cloudbursting_spec( # Create cloud specification entry for each element in zocalo.mimas.cloud # Add specification to the list if science cluster if oversubscribed # and cluster statistics are up-to-date + is_iris_live = ( + self.cluster_stats["iris"]["last_cluster_update"] > timeout_threshold + ) + is_s3echo_live = ( + self.cluster_stats["s3echo"]["last_cluster_update"] > timeout_threshold + ) + is_s3echo_quota = self.cluster_stats["s3echo"]["total"] < s3echo_quota + cloudbursting = is_iris_live and is_s3echo_live and is_s3echo_quota + self.log.debug( + pformat( + { + "is_iris_live": is_iris_live, + "is_s3echo_live": is_s3echo_live, + "is_s3echo_quota": is_s3echo_quota, + } + ) + ) + if not cloudbursting: + return cloud_spec_list + + # Create cloud specification entry for each element in zocalo.mimas.cloud + # Add specification to the list if science cluster if oversubscribed + # and cluster statistics are up-to-date for group in self.config.storage.get("zocalo.mimas.cloud", []): if not group.get("cloudbursting", True): continue @@ -240,30 +263,28 @@ def get_cloudbursting_spec( beamlines=set(group.get("beamlines", [])) ) group_max_jobs_waiting = group.get("max_jobs_waiting", max_jobs_waiting) - if ( - ( - ( - self.cluster_stats["slurm"]["jobs_waiting"] - > group_max_jobs_waiting["slurm"] - ) - or ( - self.cluster_stats["slurm"]["last_cluster_update"] - < timeout_threshold - ) - ) - and ( - self.cluster_stats["iris"]["jobs_waiting"] - < group_max_jobs_waiting["iris"] - ) - and ( - self.cluster_stats["iris"]["last_cluster_update"] - > timeout_threshold - ) - and ( - self.cluster_stats["s3echo"]["last_cluster_update"] - > timeout_threshold + is_slurm_max_jobs = ( + self.cluster_stats["slurm"]["jobs_waiting"] + > group_max_jobs_waiting["slurm"] + ) + is_slurm_timeout = ( + self.cluster_stats["slurm"]["last_cluster_update"] + < timeout_threshold + ) + is_iris_max_jobs = ( + self.cluster_stats["iris"]["jobs_waiting"] + < group_max_jobs_waiting["iris"] + ) + self.log.debug( + pformat( + { + "is_slurm_max_jobs": is_slurm_max_jobs, + "is_slurm_timeout": is_slurm_timeout, + "is_iris_max_jobs": is_iris_max_jobs, + } ) - ): + ) + if (is_slurm_max_jobs or is_slurm_timeout) and is_iris_max_jobs: cloud_spec_list.append( { "cloud_spec": cloud_spec, diff --git a/src/dlstbx/services/s3echo_collector.py b/src/dlstbx/services/s3echo_collector.py new file mode 100644 index 000000000..c779ac5f8 --- /dev/null +++ b/src/dlstbx/services/s3echo_collector.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import minio +import workflows.recipe +from workflows.services.common_service import CommonService + +from dlstbx.util import iris +from dlstbx.util.iris import get_minio_client, update_dcid_info_file + + +class S3EchoCollector(CommonService): + """ + A service that keeps status of uploads to S3 Echo object store and does garbage collection of unreferenced data. + """ + + # Human readable service name + _service_name = "S3EchoCollector" + + # Logger name + _logger_name = "dlstbx.services.s3echocollector" + + # STFC S3 Echo credentials + _s3echo_credentials = "/dls_sw/apps/zocalo/secrets/credentials-echo-mx.cfg" + + def initializing(self): + """ + Register callback functions to upload and download data from S3 Echo object store. + """ + self.log.info(f"{S3EchoCollector._service_name} starting") + + self.minio_client: minio.Minio = get_minio_client( + S3EchoCollector._s3echo_credentials + ) + + self._message_delay = 5 + + workflows.recipe.wrap_subscribe( + self._transport, + "s3echo.start", + self.on_start, + acknowledgement=True, + log_extender=self.extend_log, + ) + + workflows.recipe.wrap_subscribe( + self._transport, + "s3echo.end", + self.on_end, + acknowledgement=True, + log_extender=self.extend_log, + ) + + def on_start(self, rw, header, message): + """ + Process request for uploading images to S3 Echo object store. + """ + # Conditionally acknowledge receipt of the message + txn = rw.transport.transaction_begin(subscription_id=header["subscription"]) + rw.transport.ack(header, transaction=txn) + + params = rw.recipe_step["parameters"] + minio_client = get_minio_client(S3EchoCollector._s3echo_credentials) + + bucket_name = params["bucket"] + if not minio_client.bucket_exists(bucket_name): + minio_client.make_bucket(bucket_name) + rpid = int(params["rpid"]) + + s3echo_upload_files = {} + if images := params.get("images"): + dcid = int(params["dcid"]) + response_info = update_dcid_info_file( + minio_client, bucket_name, dcid, 0, rpid, self.log + ) + try: + image_files = iris.get_image_files(images, self.log) + s3echo_upload_files.update( + {name: (dcid, pth) for name, pth in image_files.items()} + ) + except Exception: + self.log.exception("Error uploading image files to S3 Echo") + if not response_info: + self.log.debug("Sending message to upload endpoint") + rw.send_to( + "upload", {"s3echo_upload": {dcid: image_files}}, transaction=txn + ) + rw.environment.update({"s3echo_upload": s3echo_upload_files}) + self.log.debug("Sending message to watch endpoint") + rw.send_to("watch", message, transaction=txn) + elif params.get("related_images"): + for dcid, image_master_file in params.get("related_images"): + response_info = update_dcid_info_file( + minio_client, bucket_name, dcid, 0, rpid, self.log + ) + try: + image_files = iris.get_related_images_files_from_h5( + image_master_file, self.log + ) + s3echo_upload_files.update( + {name: (dcid, pth) for name, pth in image_files.items()} + ) + if not response_info: + self.log.debug("Sending message to upload endpoint") + rw.send_to( + "upload", + {"s3echo_upload": {dcid: image_files}}, + transaction=txn, + ) + except Exception: + self.log.exception("Error uploading image files to S3 Echo") + rw.environment.update({"s3echo_upload": s3echo_upload_files}) + self.log.debug("Sending message to watch endpoint") + rw.send_to("watch", message, transaction=txn) + rw.transport.transaction_commit(txn) + + def on_end(self, rw, header, message): + """ + Remove reference to image data in S3 Echo object store after end of processing. + """ + # Conditionally acknowledge receipt of the message + txn = rw.transport.transaction_begin(subscription_id=header["subscription"]) + rw.transport.ack(header, transaction=txn) + + params = rw.recipe_step["parameters"] + minio_client = get_minio_client(S3EchoCollector._s3echo_credentials) + bucket_name = params["bucket"] + rpid = int(params["rpid"]) + + for dcid, _ in params.get("related_images", [(int(params["dcid"]), None)]): + response_info = update_dcid_info_file( + minio_client, bucket_name, dcid, None, None, self.log + ) + if not response_info: + self.log.warning(f"No {dcid}_info data read from the object store") + elif response_info["status"] == -1 or ( + response_info["status"] == 1 and response_info["pid"] == [rpid] + ): + dc_objects = { + obj.object_name + for obj in minio_client.list_objects(bucket_name) + if obj.object_name is not None + } + for obj_name in dc_objects: + if obj_name.startswith(f"{dcid}_"): + minio_client.remove_object(bucket_name, obj_name) + else: + update_dcid_info_file( + minio_client, bucket_name, dcid, None, -rpid, self.log + ) + + rw.transport.transaction_commit(txn) diff --git a/src/dlstbx/services/s3echo_uploader.py b/src/dlstbx/services/s3echo_uploader.py index 0d849a7f8..15e019c1f 100644 --- a/src/dlstbx/services/s3echo_uploader.py +++ b/src/dlstbx/services/s3echo_uploader.py @@ -12,6 +12,7 @@ get_presigned_urls, remove_objects_from_s3, retrieve_results_from_s3, + update_dcid_info_file, ) @@ -21,7 +22,7 @@ class S3EchoUploader(CommonService): """ # Human readable service name - _service_name = "S3Echouploader" + _service_name = "S3EchoUploader" # Logger name _logger_name = "dlstbx.services.s3echouploader" @@ -39,8 +40,16 @@ def initializing(self): workflows.recipe.wrap_subscribe( self._transport, - "s3echo.upload", - self.on_upload, + "s3echo.upload.rpid", + self.on_upload_rpid, + acknowledgement=True, + log_extender=self.extend_log, + ) + + workflows.recipe.wrap_subscribe( + self._transport, + "s3echo.upload.dcid", + self.on_upload_dcid, acknowledgement=True, log_extender=self.extend_log, ) @@ -53,7 +62,7 @@ def initializing(self): log_extender=self.extend_log, ) - def on_upload(self, rw, header, message): + def on_upload_rpid(self, rw, header, message): """ Upload images from DLS filesystem to S3 Echo object store. """ @@ -96,6 +105,7 @@ def on_upload(self, rw, header, message): [ filepath, ], + True, self.log, ) except S3Error as err: @@ -121,9 +131,94 @@ def on_upload(self, rw, header, message): # Commit transaction rw.transport.transaction_commit(txn) + def on_upload_dcid(self, rw, header, message): + """ + Upload images from DLS filesystem to S3 Echo object store. + """ + # Conditionally acknowledge receipt of the message + txn = rw.transport.transaction_begin(subscription_id=header["subscription"]) + rw.transport.ack(header, transaction=txn) + + params = rw.recipe_step["parameters"] + minio_client = get_minio_client(S3EchoUploader._s3echo_credentials) + + # We have a list of files to upload set in recipe environment and we receive + # a list of already uploaded files via message. To find which files still + # need to be uploaded to S3 Echo we pattern match filenames of all files + # set in environment to list of uploaded file names in the message that + # are prefixed with processingjobid value. + s3echo_upload_files_all = rw.environment.get( + "s3echo_upload", {} + ) or message.get("s3echo_upload", {}) + for dcid, s3echo_upload_files in s3echo_upload_files_all.items(): + upload_file_list = s3echo_upload_files.keys() + if ( + s3_urls := message.get("s3_urls", {}) + if isinstance(message, dict) + else {} + ): + upload_file_list = sorted( + { + file_name + for file_name in s3echo_upload_files + if all(file_name not in upload_name for upload_name in s3_urls) + } + ) + try: + filename = next(iter(upload_file_list)) + filepath = s3echo_upload_files.get(filename) + except StopIteration: + self.log.exception( + f"No more files to upload to S3 bucket {params['bucket']} after receiving following file list:\n{s3_urls}" + ) + rw.send_to("success", message, transaction=txn) + else: + try: + upload_s3_url = get_presigned_urls( + minio_client, + params["bucket"], + dcid, + [ + filepath, + ], + True, + self.log, + ) + except S3Error as err: + update_dcid_info_file( + minio_client, params["bucket"], dcid, -1, None, self.log + ) + self.log.exception( + f"Error uploading following files to S3 bucket {params['bucket']}:\n{pformat(rw.environment['s3echo_upload'])}" + ) + rw.send_to("failure", message, transaction=txn) + raise err + else: + # If all files have been uploaded, we add dictionary with uploaded file info to the + # recipe environment and send it to success channel. Otherwise, we upload a single file, + # add it to the dictionary of uploaded files and checkpoint message containing it. + s3_urls.update(upload_s3_url) + if len(s3_urls) < len(s3echo_upload_files): + rw.checkpoint( + { + "s3_urls": s3_urls, + "s3echo_upload": s3echo_upload_files_all, + }, + delay=self._message_delay, + transaction=txn, + ) + else: + update_dcid_info_file( + minio_client, params["bucket"], dcid, 1, None, self.log + ) + rw.environment["s3_urls"] = s3_urls + rw.send_to("success", "Finished processing", transaction=txn) + # Commit transaction + rw.transport.transaction_commit(txn) + def on_download(self, rw, header, message): """ - Download files from S3 Echo object store tto DLS filesystem. Remove image files, if requested. + Download files from S3 Echo object store to DLS filesystem. Remove image files, if requested. """ # Conditionally acknowledge receipt of the message txn = rw.transport.transaction_begin(subscription_id=header["subscription"]) @@ -148,7 +243,9 @@ def on_download(self, rw, header, message): else: rw.send_to("success", message, transaction=txn) - if s3_urls := rw.environment.get("s3_urls"): + # For downstream tasks processing data are removed here as uploads are done per prcessing job. + # For data reduction tasks data is shared between different pipelines as removed by S3EchoCollector service. + if s3_urls := params.get("cleanup", True) and rw.environment.get("s3_urls"): remove_objects_from_s3(minio_client, params["bucket"], s3_urls, self.log) # Commit transaction diff --git a/src/dlstbx/services/s3echo_watcher.py b/src/dlstbx/services/s3echo_watcher.py new file mode 100644 index 000000000..5620ee90f --- /dev/null +++ b/src/dlstbx/services/s3echo_watcher.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +from pathlib import Path +from pprint import pformat, pprint + +import minio +import workflows.recipe +from minio.error import S3Error +from workflows.services.common_service import CommonService + +from dlstbx.util.iris import ( + get_minio_client, + get_presigned_urls, + update_dcid_info_file, +) + + +class S3EchoWatcher(CommonService): + """ + A service that watches file upload status to S3 Echo object store. + """ + + # Human readable service name + _service_name = "S3EchoWatcher" + + # Logger name + _logger_name = "dlstbx.services.s3echowatcher" + + # STFC S3 Echo credentials + _s3echo_credentials = "/dls_sw/apps/zocalo/secrets/credentials-echo-mx.cfg" + + def initializing(self): + """ + Register callback functions to upload and download data from S3 Echo object store. + """ + self.log.info(f"{S3EchoWatcher._service_name} starting") + + self.minio_client: minio.Minio = get_minio_client( + S3EchoWatcher._s3echo_credentials + ) + + self._message_delay = 5 + + workflows.recipe.wrap_subscribe( + self._transport, + "s3echo.watch", + self.watch, + acknowledgement=True, + log_extender=self.extend_log, + ) + + def watch(self, rw, header, message): + """ + Upload images from DLS filesystem to S3 Echo object store. + """ + # Conditionally acknowledge receipt of the message + txn = rw.transport.transaction_begin(subscription_id=header["subscription"]) + rw.transport.ack(header, transaction=txn) + + params = rw.recipe_step["parameters"] + dcid = int(params["dcid"]) + minio_client = get_minio_client(S3EchoWatcher._s3echo_credentials) + + bucket_name = params["bucket"] + if not minio_client.bucket_exists(bucket_name): + # Stop processing message. No files to watch + rw.transport.transaction_commit(txn) + return + + # We have a list of files to upload set in recipe environment and we receive + # a list of already uploaded files via message. To find which files still + # need to be uploaded to S3 Echo we pattern match filenames of all files + # set in environment to list of uploaded file names in the message that + # are prefixed with processingjobid value. + s3echo_upload_files = rw.environment.get("s3echo_upload", {}) + + response_info = update_dcid_info_file( + minio_client, bucket_name, dcid, None, None, self.log + ) + if response_info and response_info["status"] == -1: + self.log.error( + f"Upload of the following files to S3 bucket {params['bucket']} reported in error state:\n{pformat(rw.environment['s3echo_upload'])}" + ) + rw.send_to("failure", message, transaction=txn) + return False + + upload_file_list = s3echo_upload_files.keys() + pprint(s3echo_upload_files) + if s3_urls := message.get("s3_urls", {}) if isinstance(message, dict) else {}: + upload_file_list = sorted( + { + filename + for filename, (dcid, _) in s3echo_upload_files.items() + if all( + f"{dcid}_{filename}" not in upload_name + for upload_name in s3_urls + ) + } + ) + pprint(upload_file_list) + filename = next(iter(upload_file_list)) + self.log.debug(f"Looking for {filename} file upload status.") + try: + dcid, filepath = s3echo_upload_files.get(filename) + result = self.minio_client.stat_object(bucket_name, f"{dcid}_{filename}") + file_size = Path(filepath).stat().st_size + except minio.error.S3Error: + # File hasn't been uploaded yet + self.log.debug(f"File {filename} hasn't been uploaded yet.") + rw.checkpoint( + message, + delay=self._message_delay, + transaction=txn, + ) + # Stop processing message + rw.transport.transaction_commit(txn) + return + if file_size != result.size: + # File is still being uploaded + self.log.debug(f"File {filename} has been partially uploaded.") + rw.checkpoint( + message, + delay=self._message_delay, + transaction=txn, + ) + # Stop processing message + rw.transport.transaction_commit(txn) + return + try: + upload_s3_url = get_presigned_urls( + minio_client, + params["bucket"], + dcid, + [ + filepath, + ], + False, + self.log, + ) + except S3Error as err: + self.log.exception( + f"Error retrieving following file from S3 bucket {params['bucket']}:\n{filepath}" + ) + rw.send_to("failure", message, transaction=txn) + raise err + else: + # If all files have been uploaded, we add dictionary with uploaded file info to the + # recipe environment and send it to success channel. Otherwise, we upload a single file, + # add it to the dictionary of uploaded files and checkpoint message containing it. + s3_urls.update(upload_s3_url) + self.log.debug( + f"File {filename} has been uploaded to the S3 Echo object store." + ) + if len(s3_urls) < len(s3echo_upload_files): + rw.checkpoint( + {"s3_urls": s3_urls}, + delay=self._message_delay, + transaction=txn, + ) + else: + rw.environment["s3_urls"] = s3_urls + rw.send_to("success", "Finished processing", transaction=txn) + # Commit transaction + rw.transport.transaction_commit(txn) diff --git a/src/dlstbx/util/iris.py b/src/dlstbx/util/iris.py index da9e144db..bb85d9c5c 100644 --- a/src/dlstbx/util/iris.py +++ b/src/dlstbx/util/iris.py @@ -2,8 +2,9 @@ import configparser import glob +import io +import json import os -import shutil import subprocess import time import urllib.parse @@ -107,22 +108,27 @@ def remove_objects_from_s3(minio_clinet, bucket_name, s3_urls, logger): ) -def get_image_files(working_directory, images, logger): +def get_image_files(images, logger): file_list = {} h5_paths = {Path(s.split(":")[0]) for s in images.split(",")} for h5_file in h5_paths: - try: - related = set(find_all_references(h5_file)) - except (ValueError, KeyError): - logger.error(f"Could not find files related to {h5_file}", exc_info=True) - image_pattern = str(h5_file).split("master")[0] + "*" - related.union(glob.glob(image_pattern)) - for filepath in sorted(related): - filename = Path(filepath).name - logger.info(f"Found image file {filepath}") - file_list[filename] = filepath - if working_directory: - shutil.copy(filepath, working_directory / filename) + related_files = get_related_images_files_from_h5(h5_file, logger) + file_list.update(related_files) + return file_list + + +def get_related_images_files_from_h5(h5_file, logger): + file_list = {} + try: + related = set(find_all_references(h5_file)) + except (ValueError, KeyError): + logger.error(f"Could not find files related to {h5_file}", exc_info=True) + image_pattern = str(h5_file).split("master")[0] + "*" + related.union(glob.glob(image_pattern)) + for filepath in sorted(related): + filename = Path(filepath).name + logger.info(f"Found image file {filepath}") + file_list[filename] = filepath return file_list @@ -179,7 +185,7 @@ def decompress_results_file(working_directory, filename, logger): logger.debug(result.stderr) -def get_presigned_urls(minio_client, bucket_name, pid, files, logger): +def get_presigned_urls(minio_client, bucket_name, pid, files, do_upload, logger): if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) else: @@ -188,7 +194,7 @@ def get_presigned_urls(minio_client, bucket_name, pid, files, logger): s3_urls = {} store_objects = [obj.object_name for obj in minio_client.list_objects(bucket_name)] for filepath in files: - filename = "_".join([pid, Path(filepath).name]) + filename = f"{pid}_{Path(filepath).name}" file_size = Path(filepath).stat().st_size upload_file = True if filename in store_objects: @@ -202,7 +208,7 @@ def get_presigned_urls(minio_client, bucket_name, pid, files, logger): f"Reuploading {filename} because of mismatch in file size: Expected {file_size}, got {result.size}" ) upload_file = True - if upload_file: + if upload_file and do_upload: logger.info(f"Uploading file {filename} into object store.") timestamp = time.perf_counter() minio_client.fput_object( @@ -224,24 +230,17 @@ def get_presigned_urls(minio_client, bucket_name, pid, files, logger): logger.info( f"Data transfer rate for {filename} object: {8e-9 * file_size / timestamp:.3f}Gb/s" ) - s3_urls[filename] = { - "url": minio_client.presigned_get_object( - bucket_name, filename, expires=URL_EXPIRE - ), - "size": file_size, - } + if not (upload_file and not do_upload): + s3_urls[filename] = { + "url": minio_client.presigned_get_object( + bucket_name, filename, expires=URL_EXPIRE + ), + "size": file_size, + } logger.info(f"File URLs: {s3_urls}") return s3_urls -def get_presigned_urls_images(minio_client, bucket_name, pid, images, logger): - image_files = get_image_files(None, images, logger) - s3_urls = get_presigned_urls( - minio_client, bucket_name, pid, image_files.values(), logger - ) - return s3_urls - - def store_results_in_s3(minio_client, bucket_name, pfx, output_directory, logger): compressed_results_files = compress_results_directories( output_directory.parent, @@ -255,6 +254,7 @@ def store_results_in_s3(minio_client, bucket_name, pfx, output_directory, logger bucket_name, pfx, compressed_results_files, + True, logger, ) for filename in compressed_results_files: @@ -302,3 +302,48 @@ def retrieve_file_with_url(filename, url, logger): retries_left -= 1 time.sleep(5) c.close() + + +def update_dcid_info_file(minio_client, bucket_name, dcid, status, rpid, logger): + # Write {dcid}_info file that contains list of processingjobid values for all job invocations + # requiring data for a given {dcid} value. Every new processing job invocation adds corresponding + # processingjobid value to the list. + obj_pid = f"{dcid}_info" + dc_objects = [obj.object_name for obj in minio_client.list_objects(bucket_name)] + dcid_info = {"status": 0, "pid": []} + response_info = None + try: + response = None + if obj_pid in dc_objects: + response = minio_client.get_object(bucket_name, obj_pid) + if response: + response_info = json.loads(response.data.decode()) + dcid_info.update(response_info) + if status is not None: + dcid_info["status"] = status or dcid_info["status"] + if rpid is not None: + if rpid > 0: + dcid_info["pid"].append(rpid) + else: + try: + dcid_info["pid"].remove(abs(rpid)) + except ValueError: + logger.error( + f"{rpid} value not in info file list: {dcid_info['pid']}" + ) + + if status is not None or rpid is not None: + str_buffer = io.BytesIO(json.dumps(dcid_info).encode()) + buffer_length = str_buffer.getbuffer().nbytes + result = minio_client.put_object( + bucket_name, obj_pid, str_buffer, buffer_length + ) + logger.debug( + f"Written {result.object_name} object for {rpid} procecessingJobId value; etag: {result.etag}", + ) + finally: + if response: + response.close() + response.release_conn() + + return response_info diff --git a/src/dlstbx/wrapper/autoPROC.py b/src/dlstbx/wrapper/autoPROC.py index 9c30e4b6c..f087e4499 100644 --- a/src/dlstbx/wrapper/autoPROC.py +++ b/src/dlstbx/wrapper/autoPROC.py @@ -473,23 +473,6 @@ def send_results_to_ispyb( ) return success - def setup(self, working_directory: Path, params: dict): - # Create working directory with symbolic link - if params.get("create_symlink"): - dlstbx.util.symlink.create_parent_symlink( - working_directory, params["create_symlink"], levels=1 - ) - - if images := params.get("s3echo_upload"): - try: - image_files = iris.get_image_files(None, images, self.log) - self.recwrap.environment.update({"s3echo_upload": image_files}) - except Exception: - self.log.exception("Error uploading image files to S3 Echo") - return False - - return True - def run_autoPROC(self, working_directory: Path, params: dict): subprocess_directory = working_directory / "autoPROC" subprocess_directory.mkdir(parents=True, exist_ok=True) @@ -626,6 +609,9 @@ def report(self, working_directory: Path, params: dict, success: bool): results_directory = Path(params["results_directory"]) / "autoPROC" results_directory.mkdir(parents=True, exist_ok=True) if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + working_directory, params["create_symlink"], levels=1 + ) dlstbx.util.symlink.create_parent_symlink( os.fspath(results_directory), params["create_symlink"] ) @@ -803,12 +789,9 @@ def run(self): ) stage = params.get("stage") - assert stage in {None, "setup", "run", "report"} + assert stage in {None, "run", "report"} success = True - if stage in {None, "setup"}: - success = self.setup(working_directory, params) - if stage in {None, "run"} and success: success = self.run_autoPROC(working_directory, params) diff --git a/src/dlstbx/wrapper/xia2.py b/src/dlstbx/wrapper/xia2.py index 6f8bbfe35..1eaa16032 100644 --- a/src/dlstbx/wrapper/xia2.py +++ b/src/dlstbx/wrapper/xia2.py @@ -135,23 +135,6 @@ def send_results_to_ispyb( self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) self.log.info("Sent %d commands to ISPyB", len(ispyb_command_list)) - def setup(self, working_directory: Path, params: dict): - # Create symbolic link - if params.get("create_symlink"): - dlstbx.util.symlink.create_parent_symlink( - working_directory, params["create_symlink"], levels=1 - ) - - if images := params.get("s3echo_upload"): - try: - image_files = iris.get_image_files(None, images, self.log) - self.recwrap.environment.update({"s3echo_upload": image_files}) - except Exception: - self.log.exception("Error uploading image files to S3 Echo") - return False - - return True - def run_xia2(self, working_directory: Path, params: dict): if s3_urls := self.recwrap.environment.get("s3_urls"): # Logger for recording data transfer rates from S3 Echo object store @@ -275,6 +258,9 @@ def report(self, working_directory: Path, params: dict, success: bool) -> bool: results_directory = Path(params["results_directory"]) / params["program_name"] results_directory.mkdir(parents=True, exist_ok=True) if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + working_directory, params["create_symlink"], levels=1 + ) dlstbx.util.symlink.create_parent_symlink( results_directory, params["create_symlink"] ) @@ -480,15 +466,12 @@ def run(self): ) stage = params.get("stage") - assert stage in {None, "setup", "run", "report"} + assert stage in {None, "run", "report"} pipeline = params["xia2"].get("pipeline") params["program_name"] = f"xia2-{pipeline}" if pipeline else "xia2" success = True - if stage in {None, "setup"}: - success = self.setup(working_directory, params) - if stage in {None, "run"}: success = self.run_xia2(working_directory, params)