From 7ce80ca5574c44470383b5182b1510296d6ea02b Mon Sep 17 00:00:00 2001 From: rowanwalker96 Date: Wed, 17 Dec 2025 09:06:24 +0000 Subject: [PATCH 1/7] Init --- setup.py | 1 + src/dlstbx/services/trigger_xchem.py | 34 +++++++++++++++------------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/setup.py b/setup.py index b948ad46b..6dfdd39db 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ "pandda_xchem = dlstbx.wrapper.pandda_xchem:PanDDAWrapper", "pandda_post = dlstbx.wrapper.pandda_post:PanDDApostWrapper", "pandda_rhofit = dlstbx.wrapper.pandda_rhofit:PanDDARhofitWrapper", + "pipedream = dlstbx.wrapper.pipedream_xchem:PipedreamWrapper", "phaser_ellg = dlstbx.wrapper.phaser_ellg:PhasereLLGWrapper", "rlv = dlstbx.wrapper.rlv:RLVWrapper", "scaleit = dlstbx.wrapper.scaleit:ScaleitWrapper", diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index 323e612e6..3c60d76c4 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -1,5 +1,7 @@ from __future__ import annotations +import ast +import re import json import pathlib import shutil @@ -260,7 +262,7 @@ def trigger_pandda_xchem( subdir / "processing/database" / "soakDBDataFile.sqlite" ) con = sqlite3.connect( - f"file:{db_path}?mode=ro", uri=True, timeout=20 + f"file:{db_path}?mode=ro", uri=True, timeout=10 ) cur = con.cursor() cur.execute("SELECT Protein FROM soakDB") @@ -271,12 +273,6 @@ def trigger_pandda_xchem( # visit = dir.parts[-1] expt_yaml = {} expt_yaml["data"] = {"acronym": name} - # expt_yaml["autoprocessing"] = {} - # expt_yaml["autoprocessing"]["pandda"] = { - # "prerun-threshold": 300, - # "heuristic": "default", - # } - with open(subdir / ".user.yaml", "w") as f: yaml.dump(expt_yaml, f) @@ -306,10 +302,11 @@ def trigger_pandda_xchem( return {"success": True} processing_dir = xchem_visit_dir / "processing" - db = xchem_visit_dir / "processing/database" / "soakDBDataFile.sqlite" + processed_dir = xchem_visit_dir / "processed" + db = processing_dir / 'database' / "soakDBDataFile.sqlite" - # Make a copy of the most recent sqlite for reading - # db_copy = xchem_visit_dir / "processing/database" / "auto_soakDBDataFile.sqlite" + # Make a copy of the most recent sqlite for reading? + # db_copy = xchem_visit_dir / "processed/database" / "auto_soakDBDataFile.sqlite" # if not db_copy.exists() or (db.stat().st_mtime != db_copy.stat().st_mtime): # shutil.copy2(str(db), str(db_copy)) # self.log.info(f"Made a copy of {db}, auto_soakDBDataFile.sqlite") @@ -495,7 +492,7 @@ def trigger_pandda_xchem( * df["nTotalUniqueObservations"].astype(float) ) # I/sigI*completeness*# unique reflections - df = df[["autoProcScalingId", "heuristic"]].copy() + df = df[["processingPrograms","autoProcScalingId", "heuristic"]].copy() scaling_ids = df["autoProcScalingId"].tolist() # find associated dimple jobs @@ -562,6 +559,10 @@ def trigger_pandda_xchem( f"Chosen dataset to take forward: {chosen_dataset_path} for dcid {dcid}" ) scaling_id = int(df3["autoProcScalingId"][0]) + environment = df3['processingEnvironment'][0] + upstream_mtz = ast.literal_eval(re.search(r"data=(\[[^\]]*\])", environment).group(1))[0] + upstream_proc = df[df['autoProcScalingId']==scaling_id]['processingPrograms'].item() + pdb = chosen_dataset_path + "/final.pdb" mtz = chosen_dataset_path + "/final.mtz" @@ -583,7 +584,7 @@ def trigger_pandda_xchem( # Read XChem SQLite for ligand info try: - conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True, timeout=20) + conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True, timeout=10) df = pd.read_sql_query( f"SELECT * from mainTable WHERE Puck = '{code}' AND PuckPosition = {location} AND CrystalName = '{dtag}'", conn, @@ -619,10 +620,10 @@ def trigger_pandda_xchem( self.log.info( f"Puck {code}, puck position {location} has no corresponding CompoundSMILES, considering as an apo dataset" ) + return {"success": True} # 3. Create the dataset directory - tmp_dir = pathlib.Path("/dls/tmp/xchem_diff2ir") # TEMPORARY RESULTS DIR - processing_dir = tmp_dir / xchem_visit_dir.parts[-1] + processing_dir = xchem_visit_dir / 'processed' model_dir = processing_dir / "analysis" / "auto_model_building" dataset_dir = model_dir / dtag compound_dir = dataset_dir / "compound" @@ -632,15 +633,16 @@ def trigger_pandda_xchem( dataset_count = sum(1 for p in model_dir.iterdir() if p.is_dir()) self.log.info(f"Dataset count is: {dataset_count}") - # Copy the dimple files of the selected dataset + # Copy the dimple & upstream files of the selected dataset shutil.copy(pdb, str(dataset_dir / "dimple.pdb")) shutil.copy(mtz, str(dataset_dir / "dimple.mtz")) + shutil.copy(upstream_mtz, str(dataset_dir / pathlib.Path(upstream_mtz).parts[-1])) with open(compound_dir / f"{CompoundCode}.smiles", "w") as smi_file: smi_file.write(CompoundSMILES) # 4. Job launch logic - + comparator_threshold = parameters.comparator_threshold if dataset_count < comparator_threshold: From 9dd002d589eb18271ff20a6599d19e7d278a337f Mon Sep 17 00:00:00 2001 From: rowanwalker96 Date: Thu, 18 Dec 2025 16:45:34 +0000 Subject: [PATCH 2/7] Combine PanDDA2 & Pipedream trigger --- src/dlstbx/services/trigger_xchem.py | 146 ++++++++++++++------------- src/dlstbx/wrapper/pandda_xchem.py | 4 +- src/dlstbx/wrapper/pipedream.py | 93 +++++++++++++++++ 3 files changed, 172 insertions(+), 71 deletions(-) create mode 100644 src/dlstbx/wrapper/pipedream.py diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index 3c60d76c4..84df6b007 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -1,9 +1,9 @@ from __future__ import annotations import ast -import re import json import pathlib +import re import shutil import sqlite3 from datetime import datetime, timedelta @@ -159,6 +159,34 @@ def trigger(self, rw, header, message): return rw.transport.transaction_commit(txn) + def upsert_proc(self, rw, dcid, procname, recipe_parameters): + jp = self.ispyb.mx_processing.get_job_params() + jp["automatic"] = True + # jp["comments"] = parameters.comment + jp["datacollectionid"] = dcid + jp["display_name"] = "procname" + jp["recipe"] = f"postprocessing-{procname.lower()}" + self.log.info(jp) + jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) + self.log.debug(f"{procname} trigger: generated JobID {jobid}") + + for key, value in recipe_parameters.items(): + jpp = self.ispyb.mx_processing.get_job_parameter_params() + jpp["job_id"] = jobid + jpp["parameter_key"] = key + jpp["parameter_value"] = value + jppid = self.ispyb.mx_processing.upsert_job_parameter(list(jpp.values())) + self.log.debug( + f"{procname} trigger: generated JobParameterID {jppid} with {key}={value}" + ) + + self.log.debug(f"{procname}_id trigger: Processing job {jobid} created") + + message = {"recipes": [], "parameters": {"ispyb_process": jobid}} + rw.transport.send("processing_recipe", message) + + self.log.info(f"{procname}_id trigger: Processing job {jobid} triggered") + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) def trigger_pandda_xchem( self, @@ -197,6 +225,7 @@ def trigger_pandda_xchem( dcid = parameters.dcid scaling_id = parameters.scaling_id[0] + comparator_threshold = parameters.comparator_threshold protein_info = get_protein_for_dcid(parameters.dcid, session) # protein_id = getattr(protein_info, "proteinId") @@ -206,21 +235,21 @@ def trigger_pandda_xchem( query = (session.query(Proposal)).filter(Proposal.proposalId == proposal_id) proposal = query.first() - # 0. Check that this is an XChem expt, find .sqlite database + # 0. Check that this is an XChem expt & locate .SQLite database if proposal.proposalCode not in {"lb"}: # need to handle industrial 'sw' also self.log.debug( f"Not triggering PanDDA2 pipeline for dcid={dcid} with proposal_code={proposal.proposalCode}" ) return {"success": True} - # TEMPORARY, OPENBIND TEST VISIT + # TEMPORARY, FILTER BY OPENBIND VISIT if proposal.proposalNumber not in {"42888"}: self.log.debug( f"Not triggering PanDDA2 pipeline for dcid={dcid}, only accepting data collections from lb42888 during test phase" ) return {"success": True} - # Find corresponding xchem visit directory and database + # Find corresponding XChem visit directory and database xchem_dir = pathlib.Path( f"/dls/labxchem/data/{proposal.proposalCode}{proposal.proposalNumber}" ) @@ -262,7 +291,7 @@ def trigger_pandda_xchem( subdir / "processing/database" / "soakDBDataFile.sqlite" ) con = sqlite3.connect( - f"file:{db_path}?mode=ro", uri=True, timeout=10 + f"file:{db_path}?mode=ro", uri=True, timeout=20 ) cur = con.cursor() cur.execute("SELECT Protein FROM soakDB") @@ -303,10 +332,10 @@ def trigger_pandda_xchem( processing_dir = xchem_visit_dir / "processing" processed_dir = xchem_visit_dir / "processed" - db = processing_dir / 'database' / "soakDBDataFile.sqlite" + db = xchem_visit_dir / "processing/database" / "soakDBDataFile.sqlite" - # Make a copy of the most recent sqlite for reading? - # db_copy = xchem_visit_dir / "processed/database" / "auto_soakDBDataFile.sqlite" + # Make a copy of the most recent sqlite for reading + # db_copy = xchem_visit_dir / "processing/database" / "auto_soakDBDataFile.sqlite" # if not db_copy.exists() or (db.stat().st_mtime != db_copy.stat().st_mtime): # shutil.copy2(str(db), str(db_copy)) # self.log.info(f"Made a copy of {db}, auto_soakDBDataFile.sqlite") @@ -492,7 +521,7 @@ def trigger_pandda_xchem( * df["nTotalUniqueObservations"].astype(float) ) # I/sigI*completeness*# unique reflections - df = df[["processingPrograms","autoProcScalingId", "heuristic"]].copy() + df = df[["autoProcScalingId", "heuristic"]].copy() scaling_ids = df["autoProcScalingId"].tolist() # find associated dimple jobs @@ -559,14 +588,16 @@ def trigger_pandda_xchem( f"Chosen dataset to take forward: {chosen_dataset_path} for dcid {dcid}" ) scaling_id = int(df3["autoProcScalingId"][0]) - environment = df3['processingEnvironment'][0] - upstream_mtz = ast.literal_eval(re.search(r"data=(\[[^\]]*\])", environment).group(1))[0] - upstream_proc = df[df['autoProcScalingId']==scaling_id]['processingPrograms'].item() - + environment = df3["processingEnvironment"][0] + upstream_mtz = ast.literal_eval( + re.search(r"data=(\[[^\]]*\])", environment).group(1) + )[0] + self.log.info(f"Chosen mtz for dcid {dcid} is {upstream_mtz}") + # upstream_proc = df[df['autoProcScalingId']==scaling_id]['processingPrograms'].item() # fails pdb = chosen_dataset_path + "/final.pdb" mtz = chosen_dataset_path + "/final.mtz" - self.log.debug("PanDDA2 trigger: Starting") + self.log.debug("PanDDA2/Pipedream trigger: Starting") # 2. Get ligand information, location & container code @@ -602,7 +633,7 @@ def trigger_pandda_xchem( if len(df) != 1: self.log.info( - f"Unique row in .sqlite for dcid {dcid}, puck {code}, puck position {location} cannot be found in database {db}, can't continue." + f"Unique row in .sqlite for dtag {dtag}, puck {code}, puck position {location} cannot be found in database {db}, can't continue." ) return {"success": True} @@ -613,18 +644,15 @@ def trigger_pandda_xchem( if LibraryName == "DMSO": # exclude DMSO screen from PanDDA analysis self.log.info( - f"Puck {code}, puck position {location} is from DMSO solvent screen, excluding from PanDDA analysis" + f"{dtag} is DMSO solvent screen, excluding from PanDDA analysis" ) return {"success": True} elif not CompoundSMILES: - self.log.info( - f"Puck {code}, puck position {location} has no corresponding CompoundSMILES, considering as an apo dataset" - ) + self.log.info(f"{dtag} has no corresponding CompoundSMILES, skipping...") return {"success": True} # 3. Create the dataset directory - processing_dir = xchem_visit_dir / 'processed' - model_dir = processing_dir / "analysis" / "auto_model_building" + model_dir = processed_dir / "analysis" / "auto_model_building" dataset_dir = model_dir / dtag compound_dir = dataset_dir / "compound" self.log.info(f"Creating directory {dataset_dir}") @@ -633,74 +661,54 @@ def trigger_pandda_xchem( dataset_count = sum(1 for p in model_dir.iterdir() if p.is_dir()) self.log.info(f"Dataset count is: {dataset_count}") - # Copy the dimple & upstream files of the selected dataset + # Copy the dimple files of the selected dataset shutil.copy(pdb, str(dataset_dir / "dimple.pdb")) shutil.copy(mtz, str(dataset_dir / "dimple.mtz")) - shutil.copy(upstream_mtz, str(dataset_dir / pathlib.Path(upstream_mtz).parts[-1])) + shutil.copy( + upstream_mtz, str(dataset_dir / pathlib.Path(upstream_mtz).parts[-1]) + ) with open(compound_dir / f"{CompoundCode}.smiles", "w") as smi_file: smi_file.write(CompoundSMILES) # 4. Job launch logic - - comparator_threshold = parameters.comparator_threshold - - if dataset_count < comparator_threshold: - self.log.info( - f"Dataset dataset_count {dataset_count} < PanDDA2 comparator dataset threshold of {comparator_threshold}, skipping for now..." - ) - return {"success": True} - elif dataset_count == comparator_threshold: - n_datasets = len(dataset_list) - with open(model_dir / ".batch.json", "w") as f: - json.dump(dataset_list, f) - self.log.info( - f"Dataset dataset_count {dataset_count} = comparator_threshold of {comparator_threshold} datasets, launching PanDDA2 array job" - ) - elif dataset_count > comparator_threshold: - n_datasets = 1 - self.log.info(f"Launching single PanDDA2 job for dtag {dtag}") - - self.log.debug("PanDDA2 trigger: Starting") - pandda_parameters = { - "dcid": dcid, # - "processing_directory": str(processing_dir), + recipe_parameters = { + "dcid": dcid, + "processed_directory": str(processed_dir), "model_directory": str(model_dir), "dataset_directory": str(dataset_dir), "dtag": dtag, - "n_datasets": n_datasets, + "n_datasets": 1, "scaling_id": scaling_id, "comparator_threshold": comparator_threshold, "database_path": str(db), + "upstream_mtz": upstream_mtz, } - jp = self.ispyb.mx_processing.get_job_params() - jp["automatic"] = parameters.automatic - # jp["comments"] = parameters.comment - jp["datacollectionid"] = dcid - jp["display_name"] = "PanDDA2" - jp["recipe"] = "postprocessing-pandda2" - self.log.info(jp) - jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) - self.log.debug(f"PanDDA2 trigger: generated JobID {jobid}") - - for key, value in pandda_parameters.items(): - jpp = self.ispyb.mx_processing.get_job_parameter_params() - jpp["job_id"] = jobid - jpp["parameter_key"] = key - jpp["parameter_value"] = value - jppid = self.ispyb.mx_processing.upsert_job_parameter(list(jpp.values())) - self.log.debug( - f"PanDDA2 trigger: generated JobParameterID {jppid} with {key}={value}" + if dataset_count < comparator_threshold: + self.log.info( + f"Dataset dataset_count {dataset_count} < comparator dataset threshold of {comparator_threshold}, skipping PanDDA2 for now..." ) + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + return {"success": True} - self.log.debug(f"PanDDA2_id trigger: Processing job {jobid} created") + elif dataset_count == comparator_threshold: + recipe_parameters["n_datasets"] = len(dataset_list) - message = {"recipes": [], "parameters": {"ispyb_process": jobid}} - rw.transport.send("processing_recipe", message) + with open(model_dir / ".batch.json", "w") as f: + json.dump(dataset_list, f) + + self.log.info( + f"Dataset dataset_count {dataset_count} = comparator dataset threshold of {comparator_threshold}, launching PanDDA2 array job" + ) + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) - self.log.info(f"PanDDA2_id trigger: Processing job {jobid} triggered") + elif dataset_count > comparator_threshold: + self.log.info(f"Launching single PanDDA2 job for dtag {dtag}") + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) return {"success": True} diff --git a/src/dlstbx/wrapper/pandda_xchem.py b/src/dlstbx/wrapper/pandda_xchem.py index 5e216d2d0..72a8b1616 100644 --- a/src/dlstbx/wrapper/pandda_xchem.py +++ b/src/dlstbx/wrapper/pandda_xchem.py @@ -28,8 +28,8 @@ def run(self): PANDDA_2_DIR = "/dls_sw/i04-1/software/PanDDA2" # database_path = Path(params.get("database_path")) - processing_dir = Path(params.get("processing_directory")) - analysis_dir = Path(processing_dir / "analysis") + processed_dir = Path(params.get("processed_directory")) + analysis_dir = Path(processed_dir / "analysis") model_dir = Path(params.get("model_directory")) auto_panddas_dir = Path(analysis_dir / "auto_pandda2") Path(auto_panddas_dir).mkdir(exist_ok=True) diff --git a/src/dlstbx/wrapper/pipedream.py b/src/dlstbx/wrapper/pipedream.py new file mode 100644 index 000000000..ceb2cfa5e --- /dev/null +++ b/src/dlstbx/wrapper/pipedream.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import sqlite3 +import subprocess +from pathlib import Path + +from dlstbx.wrapper import Wrapper + + +class PipedreamWrapper(Wrapper): + _logger_name = "dlstbx.wrap.pipedream" + + def run(self): + assert hasattr(self, "recwrap"), "No recipewrapper object found" + self.log.info( + f"Running recipewrap file {self.recwrap.recipe_step['parameters']['recipewrapper']}" + ) + + params = self.recwrap.recipe_step["job_parameters"] + + # database_path = Path(params.get("database_path")) + processed_dir = Path(params.get("processed_directory")) + analysis_dir = Path(processed_dir / "analysis") + upstream_mtz = params.get("upstream_mtz") + dimple_pdb = "" + dimple_mtz = "" + out_dir = analysis_dir / "Pipedream" / dtag + + dtag = params.get("dtag") + + self.log.info(f"Processing dtag: {dtag}") + + pipedream_command = f"module load buster; module load graphviz; \ + export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ + /dls_sw/apps/GPhL/BUSTER/20250717/scripts/pipedream \ + -nolmr \ + -hklin {upstream_mtz} \ + -xyzin {dimple_pdb} \ + -hklref {dimple_mtz} \ + -d {out_dir} \ + -mrefine TLSbasic,WaterUpdatePkmaps \ + -keepwater \ + -remediate \ + -sidechainrebuild \ + -runpepflip \ + -rhocommands \ + -xclusters \ + -nochirals " + # -rhofit ligand.cif + + try: + result = subprocess.run( + pipedream_command, + shell=True, + capture_output=True, + text=True, + cwd=analysis_dir, + check=True, + timeout=params.get("timeout-minutes") * 60, + ) + + except subprocess.CalledProcessError as e: + self.log.error(f"PanDDA2 command: '{pipedream_command}' failed") + self.log.info(e.stdout) + self.log.error(e.stderr) + return False + + self.log.info(f"Pipedream finished successfully for dtag {dtag}") + return True + + def update_data_source(self, db_dict, dtag, database_path): + sql = ( + "UPDATE mainTable SET " + + ", ".join([f"{k} = :{k}" for k in db_dict]) + + f" WHERE CrystalName = '{dtag}'" + ) + conn = sqlite3.connect(database_path) + # conn.execute("PRAGMA journal_mode=WAL;") + cursor = conn.cursor() + cursor.execute(sql, db_dict) + conn.commit() + + # Integrate back with XCE via datasource + # db_dict = {} + # db_dict["DimplePANDDAwasRun"] = True + # # db_dict["DimplePANDDAreject"] = False + # db_dict["DimplePANDDApath"] = str(auto_panddas_dir / "processed_datasets") + + # try: + # self.update_data_source(db_dict, dtag, database_path) + # self.log.info(f"Updated sqlite database for dataset {dtag}") + # except Exception as e: + # self.log.info(f"Could not update sqlite database for dataset {dtag}: {e}") From 2102c73a7088a09ef0b379039043d18397a6dc15 Mon Sep 17 00:00:00 2001 From: rowanwalker96 Date: Fri, 19 Dec 2025 11:49:31 +0000 Subject: [PATCH 3/7] Fix pipedream wrapper --- setup.py | 2 +- src/dlstbx/services/trigger_xchem.py | 37 +++---------------- .../{pipedream.py => pipedream_xchem.py} | 18 +++++---- 3 files changed, 18 insertions(+), 39 deletions(-) rename src/dlstbx/wrapper/{pipedream.py => pipedream_xchem.py} (85%) diff --git a/setup.py b/setup.py index 6dfdd39db..97d80d313 100644 --- a/setup.py +++ b/setup.py @@ -80,7 +80,7 @@ "pandda_xchem = dlstbx.wrapper.pandda_xchem:PanDDAWrapper", "pandda_post = dlstbx.wrapper.pandda_post:PanDDApostWrapper", "pandda_rhofit = dlstbx.wrapper.pandda_rhofit:PanDDARhofitWrapper", - "pipedream = dlstbx.wrapper.pipedream_xchem:PipedreamWrapper", + "pipedream_xchem = dlstbx.wrapper.pipedream_xchem:PipedreamWrapper", "phaser_ellg = dlstbx.wrapper.phaser_ellg:PhasereLLGWrapper", "rlv = dlstbx.wrapper.rlv:RLVWrapper", "scaleit = dlstbx.wrapper.scaleit:ScaleitWrapper", diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index 84df6b007..2db01951c 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -66,7 +66,7 @@ class PanDDA_PostParameters(pydantic.BaseModel): automatic: Optional[bool] = False comment: Optional[str] = None scaling_id: list[int] - processing_directory: str + processed_directory: str timeout: float = pydantic.Field(default=60, alias="timeout-minutes") @@ -744,7 +744,7 @@ def trigger_pandda_xchem_post( dcid = parameters.dcid scaling_id = parameters.scaling_id[0] - processing_directory = pathlib.Path(parameters.processing_directory) + processed_directory = pathlib.Path(parameters.processed_directory) _, ispyb_info = dlstbx.ispybtbx.ispyb_filter({}, {"ispyb_dcid": dcid}, session) visit = ispyb_info.get("ispyb_visit", "") @@ -776,7 +776,7 @@ def trigger_pandda_xchem_post( ) .filter(ProcessingJob.dataCollectionId.in_(dcids)) .filter(ProcessingJob.automatic == True) # noqa E711 - .filter(AutoProcProgram.processingPrograms == "PanDDA2_post") + .filter(AutoProcProgram.processingPrograms == "PanDDA2-post") .filter(AutoProcProgram.recordTimeStamp > min_start_time) .filter( or_( @@ -794,37 +794,12 @@ def trigger_pandda_xchem_post( self.log.debug("PanDDA2 postrun trigger: Starting") - pandda_parameters = { + recipe_parameters = { "dcid": dcid, # - "processing_directory": str(processing_directory), + "processed_directory": str(processed_directory), "scaling_id": scaling_id, } - jp = self.ispyb.mx_processing.get_job_params() - jp["automatic"] = parameters.automatic - # jp["comments"] = parameters.comment - jp["datacollectionid"] = dcid - jp["display_name"] = "PanDDA2_post" - jp["recipe"] = "postprocessing-pandda2-post" - self.log.info(jp) - jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) - self.log.debug(f"PanDDA2 postrun trigger: generated JobID {jobid}") - - for key, value in pandda_parameters.items(): - jpp = self.ispyb.mx_processing.get_job_parameter_params() - jpp["job_id"] = jobid - jpp["parameter_key"] = key - jpp["parameter_value"] = value - jppid = self.ispyb.mx_processing.upsert_job_parameter(list(jpp.values())) - self.log.debug( - f"PanDDA2 trigger: generated JobParameterID {jppid} with {key}={value}" - ) - - self.log.debug(f"PanDDA2_post trigger: Processing job {jobid} created") - - message = {"recipes": [], "parameters": {"ispyb_process": jobid}} - rw.transport.send("processing_recipe", message) - - self.log.info(f"PanDDA2_post trigger: Processing job {jobid} triggered") + self.upsert_proc(rw, dcid, "PanDDA2-post", recipe_parameters) return {"success": True} diff --git a/src/dlstbx/wrapper/pipedream.py b/src/dlstbx/wrapper/pipedream_xchem.py similarity index 85% rename from src/dlstbx/wrapper/pipedream.py rename to src/dlstbx/wrapper/pipedream_xchem.py index ceb2cfa5e..af8d1706f 100644 --- a/src/dlstbx/wrapper/pipedream.py +++ b/src/dlstbx/wrapper/pipedream_xchem.py @@ -8,7 +8,7 @@ class PipedreamWrapper(Wrapper): - _logger_name = "dlstbx.wrap.pipedream" + _logger_name = "dlstbx.wrap.pipedream_xchem" def run(self): assert hasattr(self, "recwrap"), "No recipewrapper object found" @@ -21,13 +21,17 @@ def run(self): # database_path = Path(params.get("database_path")) processed_dir = Path(params.get("processed_directory")) analysis_dir = Path(processed_dir / "analysis") - upstream_mtz = params.get("upstream_mtz") - dimple_pdb = "" - dimple_mtz = "" - out_dir = analysis_dir / "Pipedream" / dtag - + model_dir = Path(params.get("model_directory")) dtag = params.get("dtag") + dataset_dir = model_dir / dtag + pipedream_dir = analysis_dir / "pipedream" + Path(pipedream_dir).mkdir(parents=True, exist_ok=True) + out_dir = pipedream_dir / dtag + upstream_mtz = params.get("upstream_mtz") + dimple_pdb = dataset_dir / "dimple.pdb" + dimple_mtz = dataset_dir / "dimple.mtz" + self.log.info(f"Processing dtag: {dtag}") pipedream_command = f"module load buster; module load graphviz; \ @@ -60,7 +64,7 @@ def run(self): ) except subprocess.CalledProcessError as e: - self.log.error(f"PanDDA2 command: '{pipedream_command}' failed") + self.log.error(f"Pipedream command: '{pipedream_command}' failed") self.log.info(e.stdout) self.log.error(e.stderr) return False From dc3f6945af93dd0bce0a85843c407dc08ad2b444 Mon Sep 17 00:00:00 2001 From: rowanwalker96 Date: Mon, 5 Jan 2026 11:26:05 +0000 Subject: [PATCH 4/7] Add function to process pdb for pipedream --- src/dlstbx/services/trigger_xchem.py | 16 +++++++--- src/dlstbx/wrapper/pandda_xchem.py | 7 +++-- src/dlstbx/wrapper/pipedream_xchem.py | 45 +++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index 379120d01..50db952c4 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -59,6 +59,7 @@ class PanDDAParameters(pydantic.BaseModel): backoff_delay: float = pydantic.Field(default=45, alias="backoff-delay") backoff_max_try: int = pydantic.Field(default=30, alias="backoff-max-try") backoff_multiplier: float = pydantic.Field(default=1.1, alias="backoff-multiplier") + pipedream: Optional[bool] = True class PanDDA_PostParameters(pydantic.BaseModel): @@ -226,6 +227,7 @@ def trigger_pandda_xchem( dcid = parameters.dcid scaling_id = parameters.scaling_id[0] comparator_threshold = parameters.comparator_threshold + pipedream = parameters.pipedream protein_info = get_protein_for_dcid(parameters.dcid, session) # protein_id = getattr(protein_info, "proteinId") @@ -291,7 +293,7 @@ def trigger_pandda_xchem( subdir / "processing/database" / "soakDBDataFile.sqlite" ) con = sqlite3.connect( - f"file:{db_path}?mode=ro", uri=True, timeout=20 + f"file:{db_path}?mode=ro", uri=True, timeout=10 ) cur = con.cursor() cur.execute("SELECT Protein FROM soakDB") @@ -692,7 +694,9 @@ def trigger_pandda_xchem( self.log.info( f"Dataset dataset_count {dataset_count} < comparator dataset threshold of {comparator_threshold}, skipping PanDDA2 for now..." ) - self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + + if pipedream: + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) return {"success": True} elif dataset_count == comparator_threshold: @@ -704,14 +708,18 @@ def trigger_pandda_xchem( self.log.info( f"Dataset dataset_count {dataset_count} = comparator dataset threshold of {comparator_threshold}, launching PanDDA2 array job" ) - self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) + if pipedream: + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + elif dataset_count > comparator_threshold: self.log.info(f"Launching single PanDDA2 job for dtag {dtag}") - self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) + if pipedream: + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + return {"success": True} @pydantic.validate_call(config={"arbitrary_types_allowed": True}) diff --git a/src/dlstbx/wrapper/pandda_xchem.py b/src/dlstbx/wrapper/pandda_xchem.py index 72a8b1616..0d3c9381b 100644 --- a/src/dlstbx/wrapper/pandda_xchem.py +++ b/src/dlstbx/wrapper/pandda_xchem.py @@ -125,9 +125,10 @@ def run(self): with open(pandda_log, "w") as log_file: log_file.write(result.stdout) - for item in compound_dir.iterdir(): - if item.is_file(): - shutil.copy2(item, ligand_dir / item.name) + # does ligand dir exist if --use_ligand_data=False ? + # for item in compound_dir.iterdir(): + # if item.is_file(): + # shutil.copy2(item, ligand_dir / item.name) modelled_dir = dataset_pdir / "modelled_structures" out_dir = modelled_dir / "rhofit" diff --git a/src/dlstbx/wrapper/pipedream_xchem.py b/src/dlstbx/wrapper/pipedream_xchem.py index af8d1706f..17cba079b 100644 --- a/src/dlstbx/wrapper/pipedream_xchem.py +++ b/src/dlstbx/wrapper/pipedream_xchem.py @@ -33,6 +33,7 @@ def run(self): dimple_mtz = dataset_dir / "dimple.mtz" self.log.info(f"Processing dtag: {dtag}") + self.process_pdb_file(dimple_pdb) pipedream_command = f"module load buster; module load graphviz; \ export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ @@ -72,6 +73,50 @@ def run(self): self.log.info(f"Pipedream finished successfully for dtag {dtag}") return True + def process_pdb_file(self, dimple_pdb: Path, dtag: str): + self.log.info(f"Removing crystallisation components from pdb file for {dtag}") + + if dimple_pdb.exists(): + with open(dimple_pdb, "r", encoding="utf-8") as f: + lines = f.readlines() + + # Count removals by component type + original_count = len(lines) + components_to_remove = ["DMS", "EDO", "GOL", "SO4", "PO4", "PEG"] + removed_counts = dict.fromkeys(components_to_remove, 0) + + kept_lines = [] + for line in lines: + if any(res in line for res in components_to_remove): + # Count which component was found + for comp in components_to_remove: + if comp in line: + removed_counts[comp] += 1 + break + else: + kept_lines.append(line) + + # Write cleaned file + with open(dimple_pdb, "w", encoding="utf-8") as f: + f.writelines(kept_lines) + + removed_total = original_count - len(kept_lines) + if removed_total > 0: + component_summary = ", ".join( + [ + f"{comp}: {count}" + for comp, count in removed_counts.items() + if count > 0 + ] + ) + self.log.debug( + f"Removed {removed_total} lines from {dtag} ({component_summary})" + ) + + else: + self.log.debug(f"Dimple pdb {dimple_pdb} does not exist") + return True + def update_data_source(self, db_dict, dtag, database_path): sql = ( "UPDATE mainTable SET " From e06d7995bc26507a3bdce2c2d02951e0bb48b034 Mon Sep 17 00:00:00 2001 From: rowanwalker96 Date: Tue, 13 Jan 2026 17:09:15 +0000 Subject: [PATCH 5/7] Finish pipedream wrapper --- requirements.conda.txt | 1 + src/dlstbx/services/trigger_xchem.py | 54 +++++-- src/dlstbx/wrapper/pandda_xchem.py | 22 +-- src/dlstbx/wrapper/pipedream_xchem.py | 220 +++++++++++++++++++++++--- 4 files changed, 245 insertions(+), 52 deletions(-) diff --git a/requirements.conda.txt b/requirements.conda.txt index 76c15bd8b..c371fe235 100644 --- a/requirements.conda.txt +++ b/requirements.conda.txt @@ -11,6 +11,7 @@ nxmx pandas pika pint +portalocker procrunner>=1.0.2 prometheus_client pydantic>=2.0.3 diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index 50db952c4..d432cd3a9 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -399,7 +399,7 @@ def trigger_pandda_xchem( return {"success": True} # Stop-gap - min_start_time = datetime.now() - timedelta(minutes=30) + min_start_time = datetime.now() - timedelta(hours=3) query = ( ( @@ -421,7 +421,7 @@ def trigger_pandda_xchem( return {"success": True} # Now check if other upstream pipeline is running and if so, checkpoint (it might fail) - min_start_time = datetime.now() - timedelta(hours=3) + min_start_time = datetime.now() - timedelta(hours=6) query = ( ( session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join( @@ -591,10 +591,16 @@ def trigger_pandda_xchem( ) scaling_id = int(df3["autoProcScalingId"][0]) environment = df3["processingEnvironment"][0] - upstream_mtz = ast.literal_eval( - re.search(r"data=(\[[^\]]*\])", environment).group(1) - )[0] - self.log.info(f"Chosen mtz for dcid {dcid} is {upstream_mtz}") + environment = re.search(r"data=(\[[^\]]*\])", environment) + + if environment: + upstream_mtz = ast.literal_eval(environment.group(1))[0] + self.log.info(f"Chosen mtz for dcid {dcid} is {upstream_mtz}") + else: + self.log.info( + "Cannot trigger PanDDA2/Pipedream: no environment information" + ) + # upstream_proc = df[df['autoProcScalingId']==scaling_id]['processingPrograms'].item() # fails pdb = chosen_dataset_path + "/final.pdb" mtz = chosen_dataset_path + "/final.mtz" @@ -639,7 +645,6 @@ def trigger_pandda_xchem( ) return {"success": True} - # ProteinName = df["ProteinName"].item() LibraryName = df["LibraryName"].item() CompoundSMILES = df["CompoundSMILES"].item() CompoundCode = df["CompoundCode"].item() @@ -649,14 +654,21 @@ def trigger_pandda_xchem( f"{dtag} is DMSO solvent screen, excluding from PanDDA analysis" ) return {"success": True} - elif not CompoundSMILES: + elif not CompoundSMILES or str(CompoundSMILES).strip().lower() in [ + "none", + "null", + "nan", + "", + ]: self.log.info( - f"Puck {code}, puck position {location} has no corresponding CompoundSMILES, apo dataset? Skipping..." + f"Puck {code}, puck position {location} has no corresponding CompoundSMILES. Skipping..." ) return {"success": True} # 3. Create the dataset directory - model_dir = processed_dir / "analysis" / "auto_model_building" + analysis_dir = processed_dir / "analysis" + pandda_dir = analysis_dir / "pandda2" + model_dir = pandda_dir / "model_building" dataset_dir = model_dir / dtag compound_dir = dataset_dir / "compound" self.log.info(f"Creating directory {dataset_dir}") @@ -675,19 +687,35 @@ def trigger_pandda_xchem( with open(compound_dir / f"{CompoundCode}.smiles", "w") as smi_file: smi_file.write(CompoundSMILES) + # Create seperate pipedream directory + pipedream_dir = analysis_dir / "pipedream" + model_dir_pd = pipedream_dir / "model_building" + dataset_dir_pd = model_dir_pd / dtag + compound_dir_pd = dataset_dir_pd / "compound" + self.log.info(f"Creating directory {dataset_dir_pd}") + pathlib.Path(compound_dir_pd).mkdir(parents=True, exist_ok=True) + shutil.copy(pdb, str(dataset_dir_pd / "dimple.pdb")) + shutil.copy(mtz, str(dataset_dir_pd / "dimple.mtz")) + shutil.copy( + upstream_mtz, str(dataset_dir_pd / pathlib.Path(upstream_mtz).parts[-1]) + ) + + with open(compound_dir_pd / f"{CompoundCode}.smiles", "w") as smi_file: + smi_file.write(CompoundSMILES) + # 4. Job launch logic recipe_parameters = { "dcid": dcid, "processed_directory": str(processed_dir), "model_directory": str(model_dir), - "dataset_directory": str(dataset_dir), "dtag": dtag, "n_datasets": 1, "scaling_id": scaling_id, "comparator_threshold": comparator_threshold, "database_path": str(db), - "upstream_mtz": upstream_mtz, + "upstream_mtz": pathlib.Path(upstream_mtz).parts[-1], + "smiles": str(CompoundSMILES), } if dataset_count < comparator_threshold: @@ -762,7 +790,7 @@ def trigger_pandda_xchem_post( visit_number = visit.split("-")[1] # If other PanDDA2 postrun within visit running, quit - min_start_time = datetime.now() - timedelta(hours=0.2) + min_start_time = datetime.now() - timedelta(minutes=20) # from proposal and visit get all dcids query = ( diff --git a/src/dlstbx/wrapper/pandda_xchem.py b/src/dlstbx/wrapper/pandda_xchem.py index 0d3c9381b..71dd89866 100644 --- a/src/dlstbx/wrapper/pandda_xchem.py +++ b/src/dlstbx/wrapper/pandda_xchem.py @@ -30,8 +30,9 @@ def run(self): # database_path = Path(params.get("database_path")) processed_dir = Path(params.get("processed_directory")) analysis_dir = Path(processed_dir / "analysis") + pandda_dir = analysis_dir / "pandda2" model_dir = Path(params.get("model_directory")) - auto_panddas_dir = Path(analysis_dir / "auto_pandda2") + auto_panddas_dir = Path(pandda_dir / "panddas") Path(auto_panddas_dir).mkdir(exist_ok=True) n_datasets = int(params.get("n_datasets")) @@ -66,7 +67,7 @@ def run(self): # acedrg_command = f"module load ccp4; acedrg -i {smiles_file} -o {CompoundCode}" restraints_command = f"module load buster; module load graphviz; \ export CSDHOME=/dls_sw/apps/CSDS/2024.1.0/; export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ - grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f; " + grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f" try: result = subprocess.run( @@ -125,11 +126,6 @@ def run(self): with open(pandda_log, "w") as log_file: log_file.write(result.stdout) - # does ligand dir exist if --use_ligand_data=False ? - # for item in compound_dir.iterdir(): - # if item.is_file(): - # shutil.copy2(item, ligand_dir / item.name) - modelled_dir = dataset_pdir / "modelled_structures" out_dir = modelled_dir / "rhofit" out_dir.mkdir(parents=True, exist_ok=True) @@ -396,15 +392,3 @@ def update_data_source(self, db_dict, dtag, database_path): cursor = conn.cursor() cursor.execute(sql, db_dict) conn.commit() - - # Integrate back with XCE via datasource - # db_dict = {} - # db_dict["DimplePANDDAwasRun"] = True - # # db_dict["DimplePANDDAreject"] = False - # db_dict["DimplePANDDApath"] = str(auto_panddas_dir / "processed_datasets") - - # try: - # self.update_data_source(db_dict, dtag, database_path) - # self.log.info(f"Updated sqlite database for dataset {dtag}") - # except Exception as e: - # self.log.info(f"Could not update sqlite database for dataset {dtag}: {e}") diff --git a/src/dlstbx/wrapper/pipedream_xchem.py b/src/dlstbx/wrapper/pipedream_xchem.py index 17cba079b..d02ed6a76 100644 --- a/src/dlstbx/wrapper/pipedream_xchem.py +++ b/src/dlstbx/wrapper/pipedream_xchem.py @@ -1,9 +1,13 @@ from __future__ import annotations +import json +import os import sqlite3 import subprocess from pathlib import Path +import portalocker + from dlstbx.wrapper import Wrapper @@ -21,19 +25,78 @@ def run(self): # database_path = Path(params.get("database_path")) processed_dir = Path(params.get("processed_directory")) analysis_dir = Path(processed_dir / "analysis") - model_dir = Path(params.get("model_directory")) + pipedream_dir = analysis_dir / "pipedream" + model_dir = pipedream_dir / "auto_model_building" dtag = params.get("dtag") dataset_dir = model_dir / dtag - pipedream_dir = analysis_dir / "pipedream" - Path(pipedream_dir).mkdir(parents=True, exist_ok=True) out_dir = pipedream_dir / dtag - upstream_mtz = params.get("upstream_mtz") dimple_pdb = dataset_dir / "dimple.pdb" dimple_mtz = dataset_dir / "dimple.mtz" + upstream_mtz = dataset_dir / params.get("upstream_mtz") self.log.info(f"Processing dtag: {dtag}") + + dataset_dir = model_dir / dtag + compound_dir = dataset_dir / "compound" + + smiles_files = list(compound_dir.glob("*.smiles")) + + if len(smiles_files) == 0: + self.log.error( + f"No .smiles file present in {compound_dir}, cannot continue for dtag {dtag}" + ) + return False + elif len(smiles_files) > 1: + self.log.error( + f"Multiple .smiles files found in in {compound_dir}: {smiles_files}, warning for dtag {dtag}" + ) + return False + + smiles_file = smiles_files[0] + CompoundCode = smiles_file.stem + + # ------------------------------------------------------- + restraints_command = f"module load buster; module load graphviz; \ + export CSDHOME=/dls_sw/apps/CSDS/2024.1.0/; export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ + grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f" + + try: + result = subprocess.run( + restraints_command, + shell=True, + capture_output=True, + text=True, + cwd=compound_dir, + check=True, + timeout=params.get("timeout-minutes") * 60, + ) + + except subprocess.CalledProcessError as e: + self.log.error( + f"Ligand restraint generation command: '{restraints_command}' failed for dataset {dtag}" + ) + + self.log.info(e.stdout) + self.log.error(e.stderr) + return False + + restraints = compound_dir / f"{CompoundCode}.restraints.cif" + restraints.rename(compound_dir / f"{CompoundCode}.cif") + pdb = compound_dir / f"{CompoundCode}.xyz.pdb" + pdb.rename(compound_dir / f"{CompoundCode}.pdb") + + with open(dataset_dir / "restraints.log", "w") as log_file: + log_file.write(result.stdout) + + ligand_cif = str(compound_dir / f"{CompoundCode}.cif") + self.log.info(f"Restraints generated succesfully for dtag {dtag}") + + self.log.info(f"Removing crystallisation components from pdb file for {dtag}") self.process_pdb_file(dimple_pdb) + self.log.info(f"Launching pipedream for {dtag}") + + # ------------------------------------------------------- pipedream_command = f"module load buster; module load graphviz; \ export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ @@ -50,32 +113,106 @@ def run(self): -runpepflip \ -rhocommands \ -xclusters \ - -nochirals " - # -rhofit ligand.cif + -nochirals \ + -rhofit {ligand_cif}" + + # try: + # result = subprocess.run( + # pipedream_command, + # shell=True, + # capture_output=True, + # text=True, + # cwd=analysis_dir, + # check=True, + # timeout=params.get("timeout-minutes") * 60, + # ) + + # except subprocess.CalledProcessError as e: + # self.log.error(f"Pipedream command: '{pipedream_command}' failed") + # self.log.info(e.stdout) + # self.log.error(e.stderr) + # return False + + self.log.info(f"Pipedream finished successfully for dtag {dtag}") + + pipedream_summary = f"{out_dir}/pipedream_summary.json" + self.save_dataset_metadata( + str(pipedream_dir), + str(compound_dir), + str(out_dir), + CompoundCode, + params.get("smiles"), + pipedream_command, + dtag, + ) + + try: + with open(pipedream_summary, "r") as f: + data = json.load(f) + reslo = ( + data.get("dataprocessing", {}) + .get("inputdata", {}) + .get("reslo", None) + ) + reshi = ( + data.get("dataprocessing", {}) + .get("inputdata", {}) + .get("reshigh", None) + ) + except Exception as e: + self.log.info(f"Cannot continue with pipedream postprocessing: {e}") + return True + + # Post-processing: Generate maps and run edstats + postrefine_dir = out_dir / f"postrefine-{CompoundCode}" + refine_mtz = postrefine_dir / "refine.mtz" + refine_pdb = postrefine_dir / "refine.pdb" + map_2fofc = postrefine_dir / "refine_2fofc.map" + map_fofc = postrefine_dir / "refine_fofc.map" + + try: + os.system(f"gemmi sf2map --sample 5 {str(refine_mtz)} {map_2fofc} 2>&1") + os.system(f"gemmi sf2map --sample 5 {str(refine_mtz)} {map_fofc} 2>&1") + except Exception as e: + self.log.debug(f"Cannot continue with pipedream postprocessing: {e}") + return True + + if reslo is None or reshi is None: + self.log.debug( + "Cannot continue with pipedream postprocessing: resolution range None" + ) + return True + + # Run edstats if both maps exist and resolution range is found + if not map_2fofc.exists() or not map_fofc.exists(): + self.log.debug( + "Cannot continue with pipedream postprocessing: maps not found" + ) + return True + + edstats_command = f"module load ccp4; edstats XYZIN {refine_pdb} MAPIN1 {map_2fofc} MAPIN2 {map_fofc} OUT {str(postrefine_dir / 'edstats.out')}" + stdin_text = f"RESLO={reslo}\nRESHI={reshi}\nEND\n" try: result = subprocess.run( - pipedream_command, - shell=True, - capture_output=True, + edstats_command, + input=stdin_text, text=True, - cwd=analysis_dir, + capture_output=True, check=True, - timeout=params.get("timeout-minutes") * 60, + shell=True, ) except subprocess.CalledProcessError as e: - self.log.error(f"Pipedream command: '{pipedream_command}' failed") + self.log.error(f"Edstats command: '{edstats_command}' failed") self.log.info(e.stdout) self.log.error(e.stderr) - return False + return True - self.log.info(f"Pipedream finished successfully for dtag {dtag}") + self.log.info(f"Pipedream postprocessing finished successfully for dtag {dtag}") return True - def process_pdb_file(self, dimple_pdb: Path, dtag: str): - self.log.info(f"Removing crystallisation components from pdb file for {dtag}") - + def process_pdb_file(self, dimple_pdb: Path): if dimple_pdb.exists(): with open(dimple_pdb, "r", encoding="utf-8") as f: lines = f.readlines() @@ -109,14 +246,57 @@ def process_pdb_file(self, dimple_pdb: Path, dtag: str): if count > 0 ] ) - self.log.debug( - f"Removed {removed_total} lines from {dtag} ({component_summary})" - ) + self.log.debug(f"Removed {removed_total} lines. ({component_summary})") else: self.log.debug(f"Dimple pdb {dimple_pdb} does not exist") return True + def save_dataset_metadata( + self, + pipedream_dir, + input_dir, + output_dir, + compound_code, + smiles_string, + pipedream_cmd, + dtag, + ): + metadata = { + "Input_dir": input_dir, + "CompoundCode": compound_code, + "PipedreamDirectory": output_dir, + "ReportHTML": f"{output_dir}/report-{compound_code}/index.html", + "LigandReportHTML": f"{output_dir}/report-{compound_code}/ligand/index.html", + "ExpectedSummary": f"{output_dir}/pipedream_summary.json", + "PipedreamCommand": pipedream_cmd, + "ExpectedCIF": os.path.join(input_dir, f"{compound_code}.cif"), + "ExpectedPDB": os.path.join(input_dir, f"{compound_code}.pdb"), + "InputSMILES": smiles_string, + } + + output_yaml = {} + output_yaml[dtag] = metadata + json_file = f"{pipedream_dir}/Pipedream_output.json" + + # Acquire a lock + with portalocker.Lock(json_file, "a", timeout=5): + if os.path.exists(json_file) and os.path.getsize(json_file) > 0: + with open(json_file, "r", encoding="utf-8") as f: + try: + data = json.load(f) + except Exception as e: + self.log.debug( + f"Cannot continue with pipedream postprocessing: {e}" + ) + else: + data = {} + + data.update(output_yaml) + + with open(json_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + def update_data_source(self, db_dict, dtag, database_path): sql = ( "UPDATE mainTable SET " From 22336f22799ee8b2e09b17f11d30b2da7e55a593 Mon Sep 17 00:00:00 2001 From: rowanwalker96 Date: Tue, 13 Jan 2026 17:16:19 +0000 Subject: [PATCH 6/7] Clean wrapper --- src/dlstbx/wrapper/pipedream_xchem.py | 35 ++++++++++++++------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/dlstbx/wrapper/pipedream_xchem.py b/src/dlstbx/wrapper/pipedream_xchem.py index d02ed6a76..6ccdd43a4 100644 --- a/src/dlstbx/wrapper/pipedream_xchem.py +++ b/src/dlstbx/wrapper/pipedream_xchem.py @@ -28,6 +28,7 @@ def run(self): pipedream_dir = analysis_dir / "pipedream" model_dir = pipedream_dir / "auto_model_building" dtag = params.get("dtag") + smiles = params.get("smiles") dataset_dir = model_dir / dtag out_dir = pipedream_dir / dtag @@ -116,22 +117,22 @@ def run(self): -nochirals \ -rhofit {ligand_cif}" - # try: - # result = subprocess.run( - # pipedream_command, - # shell=True, - # capture_output=True, - # text=True, - # cwd=analysis_dir, - # check=True, - # timeout=params.get("timeout-minutes") * 60, - # ) - - # except subprocess.CalledProcessError as e: - # self.log.error(f"Pipedream command: '{pipedream_command}' failed") - # self.log.info(e.stdout) - # self.log.error(e.stderr) - # return False + try: + result = subprocess.run( + pipedream_command, + shell=True, + capture_output=True, + text=True, + cwd=analysis_dir, + check=True, + timeout=params.get("timeout-minutes") * 60, + ) + + except subprocess.CalledProcessError as e: + self.log.error(f"Pipedream command: '{pipedream_command}' failed") + self.log.info(e.stdout) + self.log.error(e.stderr) + return False self.log.info(f"Pipedream finished successfully for dtag {dtag}") @@ -141,7 +142,7 @@ def run(self): str(compound_dir), str(out_dir), CompoundCode, - params.get("smiles"), + smiles, pipedream_command, dtag, ) From 7026f9679c164441277ae8a103a7969811cbc9cb Mon Sep 17 00:00:00 2001 From: rowanwalker96 Date: Wed, 14 Jan 2026 15:40:46 +0000 Subject: [PATCH 7/7] Improve logging --- src/dlstbx/services/trigger_xchem.py | 79 +++++++++++++++------------ src/dlstbx/wrapper/pandda_xchem.py | 4 +- src/dlstbx/wrapper/pipedream_xchem.py | 11 ++-- 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index d432cd3a9..d450afff5 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -316,7 +316,9 @@ def trigger_pandda_xchem( print(f"Problem reading .sqlite database for {subdir}: {e}") if not match: - self.log.debug(f"No directory found for {acronym}, can't continue") + self.log.debug( + f"Exiting PanDDA2/Pipedream trigger: No directory found for {acronym}." + ) return {"success": True} else: xchem_visit_dir = match_dir @@ -324,17 +326,17 @@ def trigger_pandda_xchem( if xchem_visit_dir: self.log.debug( - f"Found a corresponding .sqlite database in XChem visit {xchem_visit_dir} for target {acronym}" + f"Found a corresponding .sqlite database in XChem visit {xchem_visit_dir} for target {acronym}." ) else: self.log.debug( - f"Could not find a corresponding .sqlite database in XChem visit {xchem_dir} for target {acronym}, can't continue" + f"Exiting PanDDA2/Pipedream trigger: can't find .sqlite database in XChem visit {xchem_dir} for target {acronym}." ) return {"success": True} processing_dir = xchem_visit_dir / "processing" + db = processing_dir / "database" / "soakDBDataFile.sqlite" processed_dir = xchem_visit_dir / "processed" - db = xchem_visit_dir / "processing/database" / "soakDBDataFile.sqlite" # Make a copy of the most recent sqlite for reading # db_copy = xchem_visit_dir / "processing/database" / "auto_soakDBDataFile.sqlite" @@ -366,11 +368,11 @@ def trigger_pandda_xchem( if query.first()[0] == "fast_dp": self.log.info( - "Aborting PanDDA2 trigger as upstream processingProgram is fast_dp" + "Exiting PanDDA2/Pipedream trigger: upstream processingProgram is fast_dp" ) return {"success": True} - # If other dimple/PanDDA2 job is running, quit, dimple set to trigger even if it fails + # If other dimple/PanDDA2 job is running, quit, dimple set to trigger even if it fails? min_start_time = datetime.now() - timedelta(hours=6) query = ( @@ -394,11 +396,11 @@ def trigger_pandda_xchem( if triggered_processing_job := query.first(): self.log.info( - f"Aborting PanDDA2 trigger as another {triggered_processing_job.AutoProcProgram.processingPrograms} job has started for dcid {triggered_processing_job.dataCollectionId}" + f"Exiting PanDDA2/Pipedream trigger: another {triggered_processing_job.AutoProcProgram.processingPrograms} job has started for dcid {triggered_processing_job.dataCollectionId}" ) return {"success": True} - # Stop-gap + # Stop-gap, interval > max checkpoint time min_start_time = datetime.now() - timedelta(hours=3) query = ( @@ -416,7 +418,7 @@ def trigger_pandda_xchem( if triggered_processing_job := query.first(): self.log.info( - "Aborting PanDDA2 trigger as another PanDDA2 job was recently launched" + f"Exiting PanDDA2/Pipedream trigger: another PanDDA2 job was recently launched for dcid {dcid}" ) return {"success": True} @@ -468,7 +470,7 @@ def trigger_pandda_xchem( # Give up waiting for this program to finish and trigger # pandda with remaining related results are available self.log.info( - f"max-try exceeded, giving up waiting for related processings for appids {waiting_appids}\n" + f"Max-try exceeded, giving up waiting for related processings for appids {waiting_appids}\n" ) else: # Send results to myself for next round of processing @@ -562,7 +564,7 @@ def trigger_pandda_xchem( df2 = pd.read_sql(query.statement, query.session.bind) if df2.empty: self.log.info( - f"No successful dimple jobs for dcid {dcid}, can't continue..." + f"Exiting PanDDA2/Pipedream trigger: No successful dimple jobs for dcid {dcid}, skipping..." ) return {"success": True} @@ -581,7 +583,7 @@ def trigger_pandda_xchem( if df3.empty: self.log.info( - f"Problem finding 'best' dataset to take forward for PanDDA2 for dcid {dcid}" + f"Exiting PanDDA2/Pipedream trigger: Issue selecting dataset to take forward for dcid {dcid}" ) return {"success": True} @@ -598,8 +600,9 @@ def trigger_pandda_xchem( self.log.info(f"Chosen mtz for dcid {dcid} is {upstream_mtz}") else: self.log.info( - "Cannot trigger PanDDA2/Pipedream: no environment information" + "Exiting PanDDA2/Pipedream trigger: no environment information" ) + return {"success": True} # upstream_proc = df[df['autoProcScalingId']==scaling_id]['processingPrograms'].item() # fails pdb = chosen_dataset_path + "/final.pdb" @@ -621,7 +624,7 @@ def trigger_pandda_xchem( dtag = query.one()[1] code = query.one()[2] - # Read XChem SQLite for ligand info + # Read XChem SQLite database for ligand info try: conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True, timeout=10) df = pd.read_sql_query( @@ -631,7 +634,7 @@ def trigger_pandda_xchem( except Exception as e: self.log.info( - f"Exception whilst reading ligand information from {db} for dtag {dtag}: {e}" + f"Exiting PanDDA2/Pipedream trigger: Exception whilst reading ligand information from {db} for dtag {dtag}: {e}" ) return {"success": True} @@ -641,7 +644,7 @@ def trigger_pandda_xchem( if len(df) != 1: self.log.info( - f"Unique row in .sqlite for dtag {dtag}, puck {code}, puck position {location} cannot be found in database {db}, can't continue." + f"Exiting PanDDA2/Pipedream trigger: Unique row in .sqlite for dtag {dtag}, puck {code}, puck position {location} cannot be found in {db}, skipping..." ) return {"success": True} @@ -649,9 +652,9 @@ def trigger_pandda_xchem( CompoundSMILES = df["CompoundSMILES"].item() CompoundCode = df["CompoundCode"].item() - if LibraryName == "DMSO": # exclude DMSO screen from PanDDA analysis + if LibraryName == "DMSO": self.log.info( - f"{dtag} is DMSO solvent screen, excluding from PanDDA analysis" + f"Exiting PanDDA2/Pipedream trigger: {dtag} is DMSO solvent screen, skipping..." ) return {"success": True} elif not CompoundSMILES or str(CompoundSMILES).strip().lower() in [ @@ -661,7 +664,7 @@ def trigger_pandda_xchem( "", ]: self.log.info( - f"Puck {code}, puck position {location} has no corresponding CompoundSMILES. Skipping..." + f"Exiting PanDDA2/Pipedream trigger: {dtag} has no corresponding CompoundSMILES, skipping..." ) return {"success": True} @@ -688,20 +691,21 @@ def trigger_pandda_xchem( smi_file.write(CompoundSMILES) # Create seperate pipedream directory - pipedream_dir = analysis_dir / "pipedream" - model_dir_pd = pipedream_dir / "model_building" - dataset_dir_pd = model_dir_pd / dtag - compound_dir_pd = dataset_dir_pd / "compound" - self.log.info(f"Creating directory {dataset_dir_pd}") - pathlib.Path(compound_dir_pd).mkdir(parents=True, exist_ok=True) - shutil.copy(pdb, str(dataset_dir_pd / "dimple.pdb")) - shutil.copy(mtz, str(dataset_dir_pd / "dimple.mtz")) - shutil.copy( - upstream_mtz, str(dataset_dir_pd / pathlib.Path(upstream_mtz).parts[-1]) - ) - - with open(compound_dir_pd / f"{CompoundCode}.smiles", "w") as smi_file: - smi_file.write(CompoundSMILES) + if pipedream: + pipedream_dir = analysis_dir / "pipedream" + model_dir_pd = pipedream_dir / "model_building" + dataset_dir_pd = model_dir_pd / dtag + compound_dir_pd = dataset_dir_pd / "compound" + self.log.info(f"Creating directory {dataset_dir_pd}") + pathlib.Path(compound_dir_pd).mkdir(parents=True, exist_ok=True) + shutil.copy(pdb, str(dataset_dir_pd / "dimple.pdb")) + shutil.copy(mtz, str(dataset_dir_pd / "dimple.mtz")) + shutil.copy( + upstream_mtz, str(dataset_dir_pd / pathlib.Path(upstream_mtz).parts[-1]) + ) + + with open(compound_dir_pd / f"{CompoundCode}.smiles", "w") as smi_file: + smi_file.write(CompoundSMILES) # 4. Job launch logic @@ -720,10 +724,11 @@ def trigger_pandda_xchem( if dataset_count < comparator_threshold: self.log.info( - f"Dataset dataset_count {dataset_count} < comparator dataset threshold of {comparator_threshold}, skipping PanDDA2 for now..." + f"{dataset_count} < comparator dataset threshold of {comparator_threshold}, skipping PanDDA2 for now..." ) if pipedream: + self.log.info(f"Launching Pipedream for dtag {dtag}") self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) return {"success": True} @@ -734,11 +739,12 @@ def trigger_pandda_xchem( json.dump(dataset_list, f) self.log.info( - f"Dataset dataset_count {dataset_count} = comparator dataset threshold of {comparator_threshold}, launching PanDDA2 array job" + f"{dataset_count} = comparator dataset threshold of {comparator_threshold}, launching PanDDA2 array job" ) self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) if pipedream: + self.log.info(f"Launching Pipedream for dtag {dtag}") self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) elif dataset_count > comparator_threshold: @@ -746,6 +752,7 @@ def trigger_pandda_xchem( self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) if pipedream: + self.log.info(f"Launching Pipedream for dtag {dtag}") self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) return {"success": True} @@ -790,7 +797,7 @@ def trigger_pandda_xchem_post( visit_number = visit.split("-")[1] # If other PanDDA2 postrun within visit running, quit - min_start_time = datetime.now() - timedelta(minutes=20) + min_start_time = datetime.now() - timedelta(minutes=30) # from proposal and visit get all dcids query = ( diff --git a/src/dlstbx/wrapper/pandda_xchem.py b/src/dlstbx/wrapper/pandda_xchem.py index 71dd89866..2e70da664 100644 --- a/src/dlstbx/wrapper/pandda_xchem.py +++ b/src/dlstbx/wrapper/pandda_xchem.py @@ -23,15 +23,15 @@ def run(self): f"Running recipewrap file {self.recwrap.recipe_step['parameters']['recipewrapper']}" ) + PANDDA_2_DIR = "/dls_sw/i04-1/software/PanDDA2" slurm_task_id = os.environ.get("SLURM_ARRAY_TASK_ID") params = self.recwrap.recipe_step["job_parameters"] - PANDDA_2_DIR = "/dls_sw/i04-1/software/PanDDA2" # database_path = Path(params.get("database_path")) processed_dir = Path(params.get("processed_directory")) analysis_dir = Path(processed_dir / "analysis") pandda_dir = analysis_dir / "pandda2" - model_dir = Path(params.get("model_directory")) + model_dir = pandda_dir / "model_building" auto_panddas_dir = Path(pandda_dir / "panddas") Path(auto_panddas_dir).mkdir(exist_ok=True) diff --git a/src/dlstbx/wrapper/pipedream_xchem.py b/src/dlstbx/wrapper/pipedream_xchem.py index 6ccdd43a4..57f84e6e4 100644 --- a/src/dlstbx/wrapper/pipedream_xchem.py +++ b/src/dlstbx/wrapper/pipedream_xchem.py @@ -26,12 +26,13 @@ def run(self): processed_dir = Path(params.get("processed_directory")) analysis_dir = Path(processed_dir / "analysis") pipedream_dir = analysis_dir / "pipedream" - model_dir = pipedream_dir / "auto_model_building" + model_dir = pipedream_dir / "model_building" dtag = params.get("dtag") smiles = params.get("smiles") dataset_dir = model_dir / dtag out_dir = pipedream_dir / dtag + dimple_pdb = dataset_dir / "dimple.pdb" dimple_mtz = dataset_dir / "dimple.mtz" upstream_mtz = dataset_dir / params.get("upstream_mtz") @@ -123,7 +124,7 @@ def run(self): shell=True, capture_output=True, text=True, - cwd=analysis_dir, + cwd=pipedream_dir, check=True, timeout=params.get("timeout-minutes") * 60, ) @@ -161,7 +162,7 @@ def run(self): .get("reshigh", None) ) except Exception as e: - self.log.info(f"Cannot continue with pipedream postprocessing: {e}") + self.log.info(f"Can't continue with pipedream postprocessing: {e}") return True # Post-processing: Generate maps and run edstats @@ -180,14 +181,14 @@ def run(self): if reslo is None or reshi is None: self.log.debug( - "Cannot continue with pipedream postprocessing: resolution range None" + "Can't continue with pipedream postprocessing: resolution range None" ) return True # Run edstats if both maps exist and resolution range is found if not map_2fofc.exists() or not map_fofc.exists(): self.log.debug( - "Cannot continue with pipedream postprocessing: maps not found" + "Can't continue with pipedream postprocessing: maps not found" ) return True