diff --git a/requirements.conda.txt b/requirements.conda.txt index 76c15bd8b..c371fe235 100644 --- a/requirements.conda.txt +++ b/requirements.conda.txt @@ -11,6 +11,7 @@ nxmx pandas pika pint +portalocker procrunner>=1.0.2 prometheus_client pydantic>=2.0.3 diff --git a/setup.py b/setup.py index b948ad46b..97d80d313 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ "pandda_xchem = dlstbx.wrapper.pandda_xchem:PanDDAWrapper", "pandda_post = dlstbx.wrapper.pandda_post:PanDDApostWrapper", "pandda_rhofit = dlstbx.wrapper.pandda_rhofit:PanDDARhofitWrapper", + "pipedream_xchem = dlstbx.wrapper.pipedream_xchem:PipedreamWrapper", "phaser_ellg = dlstbx.wrapper.phaser_ellg:PhasereLLGWrapper", "rlv = dlstbx.wrapper.rlv:RLVWrapper", "scaleit = dlstbx.wrapper.scaleit:ScaleitWrapper", diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index 385109eb1..d432cd3a9 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -1,7 +1,9 @@ from __future__ import annotations +import ast import json import pathlib +import re import shutil import sqlite3 from datetime import datetime, timedelta @@ -57,6 +59,7 @@ class PanDDAParameters(pydantic.BaseModel): backoff_delay: float = pydantic.Field(default=45, alias="backoff-delay") backoff_max_try: int = pydantic.Field(default=30, alias="backoff-max-try") backoff_multiplier: float = pydantic.Field(default=1.1, alias="backoff-multiplier") + pipedream: Optional[bool] = True class PanDDA_PostParameters(pydantic.BaseModel): @@ -64,7 +67,7 @@ class PanDDA_PostParameters(pydantic.BaseModel): automatic: Optional[bool] = False comment: Optional[str] = None scaling_id: list[int] - processing_directory: str + processed_directory: str timeout: float = pydantic.Field(default=60, alias="timeout-minutes") @@ -157,6 +160,34 @@ def trigger(self, rw, header, message): return rw.transport.transaction_commit(txn) + def upsert_proc(self, rw, dcid, procname, recipe_parameters): + jp = self.ispyb.mx_processing.get_job_params() + jp["automatic"] = True + # jp["comments"] = parameters.comment + jp["datacollectionid"] = dcid + jp["display_name"] = "procname" + jp["recipe"] = f"postprocessing-{procname.lower()}" + self.log.info(jp) + jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) + self.log.debug(f"{procname} trigger: generated JobID {jobid}") + + for key, value in recipe_parameters.items(): + jpp = self.ispyb.mx_processing.get_job_parameter_params() + jpp["job_id"] = jobid + jpp["parameter_key"] = key + jpp["parameter_value"] = value + jppid = self.ispyb.mx_processing.upsert_job_parameter(list(jpp.values())) + self.log.debug( + f"{procname} trigger: generated JobParameterID {jppid} with {key}={value}" + ) + + self.log.debug(f"{procname}_id trigger: Processing job {jobid} created") + + message = {"recipes": [], "parameters": {"ispyb_process": jobid}} + rw.transport.send("processing_recipe", message) + + self.log.info(f"{procname}_id trigger: Processing job {jobid} triggered") + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) def trigger_pandda_xchem( self, @@ -195,6 +226,8 @@ def trigger_pandda_xchem( dcid = parameters.dcid scaling_id = parameters.scaling_id[0] + comparator_threshold = parameters.comparator_threshold + pipedream = parameters.pipedream protein_info = get_protein_for_dcid(parameters.dcid, session) # protein_id = getattr(protein_info, "proteinId") @@ -204,21 +237,21 @@ def trigger_pandda_xchem( query = (session.query(Proposal)).filter(Proposal.proposalId == proposal_id) proposal = query.first() - # 0. Check that this is an XChem expt, find .sqlite database + # 0. Check that this is an XChem expt & locate .SQLite database if proposal.proposalCode not in {"lb"}: # need to handle industrial 'sw' also self.log.debug( f"Not triggering PanDDA2 pipeline for dcid={dcid} with proposal_code={proposal.proposalCode}" ) return {"success": True} - # TEMPORARY, OPENBIND TEST VISIT + # TEMPORARY, FILTER BY OPENBIND VISIT if proposal.proposalNumber not in {"42888"}: self.log.debug( f"Not triggering PanDDA2 pipeline for dcid={dcid}, only accepting data collections from lb42888 during test phase" ) return {"success": True} - # Find corresponding xchem visit directory and database + # Find corresponding XChem visit directory and database xchem_dir = pathlib.Path( f"/dls/labxchem/data/{proposal.proposalCode}{proposal.proposalNumber}" ) @@ -260,7 +293,7 @@ def trigger_pandda_xchem( subdir / "processing/database" / "soakDBDataFile.sqlite" ) con = sqlite3.connect( - f"file:{db_path}?mode=ro", uri=True, timeout=20 + f"file:{db_path}?mode=ro", uri=True, timeout=10 ) cur = con.cursor() cur.execute("SELECT Protein FROM soakDB") @@ -271,12 +304,6 @@ def trigger_pandda_xchem( # visit = dir.parts[-1] expt_yaml = {} expt_yaml["data"] = {"acronym": name} - # expt_yaml["autoprocessing"] = {} - # expt_yaml["autoprocessing"]["pandda"] = { - # "prerun-threshold": 300, - # "heuristic": "default", - # } - with open(subdir / ".user.yaml", "w") as f: yaml.dump(expt_yaml, f) @@ -306,6 +333,7 @@ def trigger_pandda_xchem( return {"success": True} processing_dir = xchem_visit_dir / "processing" + processed_dir = xchem_visit_dir / "processed" db = xchem_visit_dir / "processing/database" / "soakDBDataFile.sqlite" # Make a copy of the most recent sqlite for reading @@ -371,7 +399,7 @@ def trigger_pandda_xchem( return {"success": True} # Stop-gap - min_start_time = datetime.now() - timedelta(minutes=30) + min_start_time = datetime.now() - timedelta(hours=3) query = ( ( @@ -393,7 +421,7 @@ def trigger_pandda_xchem( return {"success": True} # Now check if other upstream pipeline is running and if so, checkpoint (it might fail) - min_start_time = datetime.now() - timedelta(hours=3) + min_start_time = datetime.now() - timedelta(hours=6) query = ( ( session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join( @@ -562,10 +590,22 @@ def trigger_pandda_xchem( f"Chosen dataset to take forward: {chosen_dataset_path} for dcid {dcid}" ) scaling_id = int(df3["autoProcScalingId"][0]) + environment = df3["processingEnvironment"][0] + environment = re.search(r"data=(\[[^\]]*\])", environment) + + if environment: + upstream_mtz = ast.literal_eval(environment.group(1))[0] + self.log.info(f"Chosen mtz for dcid {dcid} is {upstream_mtz}") + else: + self.log.info( + "Cannot trigger PanDDA2/Pipedream: no environment information" + ) + + # upstream_proc = df[df['autoProcScalingId']==scaling_id]['processingPrograms'].item() # fails pdb = chosen_dataset_path + "/final.pdb" mtz = chosen_dataset_path + "/final.mtz" - self.log.debug("PanDDA2 trigger: Starting") + self.log.debug("PanDDA2/Pipedream trigger: Starting") # 2. Get ligand information, location & container code @@ -583,7 +623,7 @@ def trigger_pandda_xchem( # Read XChem SQLite for ligand info try: - conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True, timeout=20) + conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True, timeout=10) df = pd.read_sql_query( f"SELECT * from mainTable WHERE Puck = '{code}' AND PuckPosition = {location} AND CrystalName = '{dtag}'", conn, @@ -601,30 +641,34 @@ def trigger_pandda_xchem( if len(df) != 1: self.log.info( - f"Unique row in .sqlite for dcid {dcid}, puck {code}, puck position {location} cannot be found in database {db}, can't continue." + f"Unique row in .sqlite for dtag {dtag}, puck {code}, puck position {location} cannot be found in database {db}, can't continue." ) return {"success": True} - # ProteinName = df["ProteinName"].item() LibraryName = df["LibraryName"].item() CompoundSMILES = df["CompoundSMILES"].item() CompoundCode = df["CompoundCode"].item() if LibraryName == "DMSO": # exclude DMSO screen from PanDDA analysis self.log.info( - f"Puck {code}, puck position {location} is from DMSO solvent screen, excluding from PanDDA analysis" + f"{dtag} is DMSO solvent screen, excluding from PanDDA analysis" ) return {"success": True} - elif not CompoundSMILES: + elif not CompoundSMILES or str(CompoundSMILES).strip().lower() in [ + "none", + "null", + "nan", + "", + ]: self.log.info( - f"Puck {code}, puck position {location} has no corresponding CompoundSMILES, apo dataset? Skipping..." + f"Puck {code}, puck position {location} has no corresponding CompoundSMILES. Skipping..." ) return {"success": True} # 3. Create the dataset directory - tmp_dir = pathlib.Path("/dls/tmp/xchem_diff2ir") # TEMPORARY RESULTS DIR - processing_dir = tmp_dir / xchem_visit_dir.parts[-1] - model_dir = processing_dir / "analysis" / "auto_model_building" + analysis_dir = processed_dir / "analysis" + pandda_dir = analysis_dir / "pandda2" + model_dir = pandda_dir / "model_building" dataset_dir = model_dir / dtag compound_dir = dataset_dir / "compound" self.log.info(f"Creating directory {dataset_dir}") @@ -636,70 +680,73 @@ def trigger_pandda_xchem( # Copy the dimple files of the selected dataset shutil.copy(pdb, str(dataset_dir / "dimple.pdb")) shutil.copy(mtz, str(dataset_dir / "dimple.mtz")) + shutil.copy( + upstream_mtz, str(dataset_dir / pathlib.Path(upstream_mtz).parts[-1]) + ) with open(compound_dir / f"{CompoundCode}.smiles", "w") as smi_file: smi_file.write(CompoundSMILES) - # 4. Job launch logic - - comparator_threshold = parameters.comparator_threshold + # Create seperate pipedream directory + pipedream_dir = analysis_dir / "pipedream" + model_dir_pd = pipedream_dir / "model_building" + dataset_dir_pd = model_dir_pd / dtag + compound_dir_pd = dataset_dir_pd / "compound" + self.log.info(f"Creating directory {dataset_dir_pd}") + pathlib.Path(compound_dir_pd).mkdir(parents=True, exist_ok=True) + shutil.copy(pdb, str(dataset_dir_pd / "dimple.pdb")) + shutil.copy(mtz, str(dataset_dir_pd / "dimple.mtz")) + shutil.copy( + upstream_mtz, str(dataset_dir_pd / pathlib.Path(upstream_mtz).parts[-1]) + ) - if dataset_count < comparator_threshold: - self.log.info( - f"Dataset dataset_count {dataset_count} < PanDDA2 comparator dataset threshold of {comparator_threshold}, skipping for now..." - ) - return {"success": True} - elif dataset_count == comparator_threshold: - n_datasets = len(dataset_list) - with open(model_dir / ".batch.json", "w") as f: - json.dump(dataset_list, f) - self.log.info( - f"Dataset dataset_count {dataset_count} = comparator_threshold of {comparator_threshold} datasets, launching PanDDA2 array job" - ) - elif dataset_count > comparator_threshold: - n_datasets = 1 - self.log.info(f"Launching single PanDDA2 job for dtag {dtag}") + with open(compound_dir_pd / f"{CompoundCode}.smiles", "w") as smi_file: + smi_file.write(CompoundSMILES) - self.log.debug("PanDDA2 trigger: Starting") + # 4. Job launch logic - pandda_parameters = { - "dcid": dcid, # - "processing_directory": str(processing_dir), + recipe_parameters = { + "dcid": dcid, + "processed_directory": str(processed_dir), "model_directory": str(model_dir), - "dataset_directory": str(dataset_dir), "dtag": dtag, - "n_datasets": n_datasets, + "n_datasets": 1, "scaling_id": scaling_id, "comparator_threshold": comparator_threshold, "database_path": str(db), + "upstream_mtz": pathlib.Path(upstream_mtz).parts[-1], + "smiles": str(CompoundSMILES), } - jp = self.ispyb.mx_processing.get_job_params() - jp["automatic"] = parameters.automatic - # jp["comments"] = parameters.comment - jp["datacollectionid"] = dcid - jp["display_name"] = "PanDDA2" - jp["recipe"] = "postprocessing-pandda2" - self.log.info(jp) - jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) - self.log.debug(f"PanDDA2 trigger: generated JobID {jobid}") + if dataset_count < comparator_threshold: + self.log.info( + f"Dataset dataset_count {dataset_count} < comparator dataset threshold of {comparator_threshold}, skipping PanDDA2 for now..." + ) - for key, value in pandda_parameters.items(): - jpp = self.ispyb.mx_processing.get_job_parameter_params() - jpp["job_id"] = jobid - jpp["parameter_key"] = key - jpp["parameter_value"] = value - jppid = self.ispyb.mx_processing.upsert_job_parameter(list(jpp.values())) - self.log.debug( - f"PanDDA2 trigger: generated JobParameterID {jppid} with {key}={value}" + if pipedream: + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + return {"success": True} + + elif dataset_count == comparator_threshold: + recipe_parameters["n_datasets"] = len(dataset_list) + + with open(model_dir / ".batch.json", "w") as f: + json.dump(dataset_list, f) + + self.log.info( + f"Dataset dataset_count {dataset_count} = comparator dataset threshold of {comparator_threshold}, launching PanDDA2 array job" ) + self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) - self.log.debug(f"PanDDA2_id trigger: Processing job {jobid} created") + if pipedream: + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) - message = {"recipes": [], "parameters": {"ispyb_process": jobid}} - rw.transport.send("processing_recipe", message) + elif dataset_count > comparator_threshold: + self.log.info(f"Launching single PanDDA2 job for dtag {dtag}") + self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) - self.log.info(f"PanDDA2_id trigger: Processing job {jobid} triggered") + if pipedream: + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) return {"success": True} @@ -735,7 +782,7 @@ def trigger_pandda_xchem_post( dcid = parameters.dcid scaling_id = parameters.scaling_id[0] - processing_directory = pathlib.Path(parameters.processing_directory) + processed_directory = pathlib.Path(parameters.processed_directory) _, ispyb_info = dlstbx.ispybtbx.ispyb_filter({}, {"ispyb_dcid": dcid}, session) visit = ispyb_info.get("ispyb_visit", "") @@ -743,7 +790,7 @@ def trigger_pandda_xchem_post( visit_number = visit.split("-")[1] # If other PanDDA2 postrun within visit running, quit - min_start_time = datetime.now() - timedelta(hours=0.2) + min_start_time = datetime.now() - timedelta(minutes=20) # from proposal and visit get all dcids query = ( @@ -767,7 +814,7 @@ def trigger_pandda_xchem_post( ) .filter(ProcessingJob.dataCollectionId.in_(dcids)) .filter(ProcessingJob.automatic == True) # noqa E711 - .filter(AutoProcProgram.processingPrograms == "PanDDA2_post") + .filter(AutoProcProgram.processingPrograms == "PanDDA2-post") .filter(AutoProcProgram.recordTimeStamp > min_start_time) .filter( or_( @@ -785,37 +832,12 @@ def trigger_pandda_xchem_post( self.log.debug("PanDDA2 postrun trigger: Starting") - pandda_parameters = { + recipe_parameters = { "dcid": dcid, # - "processing_directory": str(processing_directory), + "processed_directory": str(processed_directory), "scaling_id": scaling_id, } - jp = self.ispyb.mx_processing.get_job_params() - jp["automatic"] = parameters.automatic - # jp["comments"] = parameters.comment - jp["datacollectionid"] = dcid - jp["display_name"] = "PanDDA2_post" - jp["recipe"] = "postprocessing-pandda2-post" - self.log.info(jp) - jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) - self.log.debug(f"PanDDA2 postrun trigger: generated JobID {jobid}") - - for key, value in pandda_parameters.items(): - jpp = self.ispyb.mx_processing.get_job_parameter_params() - jpp["job_id"] = jobid - jpp["parameter_key"] = key - jpp["parameter_value"] = value - jppid = self.ispyb.mx_processing.upsert_job_parameter(list(jpp.values())) - self.log.debug( - f"PanDDA2 trigger: generated JobParameterID {jppid} with {key}={value}" - ) - - self.log.debug(f"PanDDA2_post trigger: Processing job {jobid} created") - - message = {"recipes": [], "parameters": {"ispyb_process": jobid}} - rw.transport.send("processing_recipe", message) - - self.log.info(f"PanDDA2_post trigger: Processing job {jobid} triggered") + self.upsert_proc(rw, dcid, "PanDDA2-post", recipe_parameters) return {"success": True} diff --git a/src/dlstbx/wrapper/pandda_xchem.py b/src/dlstbx/wrapper/pandda_xchem.py index 5e216d2d0..71dd89866 100644 --- a/src/dlstbx/wrapper/pandda_xchem.py +++ b/src/dlstbx/wrapper/pandda_xchem.py @@ -28,10 +28,11 @@ def run(self): PANDDA_2_DIR = "/dls_sw/i04-1/software/PanDDA2" # database_path = Path(params.get("database_path")) - processing_dir = Path(params.get("processing_directory")) - analysis_dir = Path(processing_dir / "analysis") + processed_dir = Path(params.get("processed_directory")) + analysis_dir = Path(processed_dir / "analysis") + pandda_dir = analysis_dir / "pandda2" model_dir = Path(params.get("model_directory")) - auto_panddas_dir = Path(analysis_dir / "auto_pandda2") + auto_panddas_dir = Path(pandda_dir / "panddas") Path(auto_panddas_dir).mkdir(exist_ok=True) n_datasets = int(params.get("n_datasets")) @@ -66,7 +67,7 @@ def run(self): # acedrg_command = f"module load ccp4; acedrg -i {smiles_file} -o {CompoundCode}" restraints_command = f"module load buster; module load graphviz; \ export CSDHOME=/dls_sw/apps/CSDS/2024.1.0/; export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ - grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f; " + grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f" try: result = subprocess.run( @@ -125,10 +126,6 @@ def run(self): with open(pandda_log, "w") as log_file: log_file.write(result.stdout) - for item in compound_dir.iterdir(): - if item.is_file(): - shutil.copy2(item, ligand_dir / item.name) - modelled_dir = dataset_pdir / "modelled_structures" out_dir = modelled_dir / "rhofit" out_dir.mkdir(parents=True, exist_ok=True) @@ -395,15 +392,3 @@ def update_data_source(self, db_dict, dtag, database_path): cursor = conn.cursor() cursor.execute(sql, db_dict) conn.commit() - - # Integrate back with XCE via datasource - # db_dict = {} - # db_dict["DimplePANDDAwasRun"] = True - # # db_dict["DimplePANDDAreject"] = False - # db_dict["DimplePANDDApath"] = str(auto_panddas_dir / "processed_datasets") - - # try: - # self.update_data_source(db_dict, dtag, database_path) - # self.log.info(f"Updated sqlite database for dataset {dtag}") - # except Exception as e: - # self.log.info(f"Could not update sqlite database for dataset {dtag}: {e}") diff --git a/src/dlstbx/wrapper/pipedream_xchem.py b/src/dlstbx/wrapper/pipedream_xchem.py new file mode 100644 index 000000000..6ccdd43a4 --- /dev/null +++ b/src/dlstbx/wrapper/pipedream_xchem.py @@ -0,0 +1,323 @@ +from __future__ import annotations + +import json +import os +import sqlite3 +import subprocess +from pathlib import Path + +import portalocker + +from dlstbx.wrapper import Wrapper + + +class PipedreamWrapper(Wrapper): + _logger_name = "dlstbx.wrap.pipedream_xchem" + + def run(self): + assert hasattr(self, "recwrap"), "No recipewrapper object found" + self.log.info( + f"Running recipewrap file {self.recwrap.recipe_step['parameters']['recipewrapper']}" + ) + + params = self.recwrap.recipe_step["job_parameters"] + + # database_path = Path(params.get("database_path")) + processed_dir = Path(params.get("processed_directory")) + analysis_dir = Path(processed_dir / "analysis") + pipedream_dir = analysis_dir / "pipedream" + model_dir = pipedream_dir / "auto_model_building" + dtag = params.get("dtag") + smiles = params.get("smiles") + + dataset_dir = model_dir / dtag + out_dir = pipedream_dir / dtag + dimple_pdb = dataset_dir / "dimple.pdb" + dimple_mtz = dataset_dir / "dimple.mtz" + upstream_mtz = dataset_dir / params.get("upstream_mtz") + + self.log.info(f"Processing dtag: {dtag}") + + dataset_dir = model_dir / dtag + compound_dir = dataset_dir / "compound" + + smiles_files = list(compound_dir.glob("*.smiles")) + + if len(smiles_files) == 0: + self.log.error( + f"No .smiles file present in {compound_dir}, cannot continue for dtag {dtag}" + ) + return False + elif len(smiles_files) > 1: + self.log.error( + f"Multiple .smiles files found in in {compound_dir}: {smiles_files}, warning for dtag {dtag}" + ) + return False + + smiles_file = smiles_files[0] + CompoundCode = smiles_file.stem + + # ------------------------------------------------------- + restraints_command = f"module load buster; module load graphviz; \ + export CSDHOME=/dls_sw/apps/CSDS/2024.1.0/; export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ + grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f" + + try: + result = subprocess.run( + restraints_command, + shell=True, + capture_output=True, + text=True, + cwd=compound_dir, + check=True, + timeout=params.get("timeout-minutes") * 60, + ) + + except subprocess.CalledProcessError as e: + self.log.error( + f"Ligand restraint generation command: '{restraints_command}' failed for dataset {dtag}" + ) + + self.log.info(e.stdout) + self.log.error(e.stderr) + return False + + restraints = compound_dir / f"{CompoundCode}.restraints.cif" + restraints.rename(compound_dir / f"{CompoundCode}.cif") + pdb = compound_dir / f"{CompoundCode}.xyz.pdb" + pdb.rename(compound_dir / f"{CompoundCode}.pdb") + + with open(dataset_dir / "restraints.log", "w") as log_file: + log_file.write(result.stdout) + + ligand_cif = str(compound_dir / f"{CompoundCode}.cif") + self.log.info(f"Restraints generated succesfully for dtag {dtag}") + + self.log.info(f"Removing crystallisation components from pdb file for {dtag}") + self.process_pdb_file(dimple_pdb) + self.log.info(f"Launching pipedream for {dtag}") + + # ------------------------------------------------------- + + pipedream_command = f"module load buster; module load graphviz; \ + export BDG_TOOL_MOGUL=/dls_sw/apps/CSDS/2024.1.0/ccdc-software/mogul/bin/mogul; \ + /dls_sw/apps/GPhL/BUSTER/20250717/scripts/pipedream \ + -nolmr \ + -hklin {upstream_mtz} \ + -xyzin {dimple_pdb} \ + -hklref {dimple_mtz} \ + -d {out_dir} \ + -mrefine TLSbasic,WaterUpdatePkmaps \ + -keepwater \ + -remediate \ + -sidechainrebuild \ + -runpepflip \ + -rhocommands \ + -xclusters \ + -nochirals \ + -rhofit {ligand_cif}" + + try: + result = subprocess.run( + pipedream_command, + shell=True, + capture_output=True, + text=True, + cwd=analysis_dir, + check=True, + timeout=params.get("timeout-minutes") * 60, + ) + + except subprocess.CalledProcessError as e: + self.log.error(f"Pipedream command: '{pipedream_command}' failed") + self.log.info(e.stdout) + self.log.error(e.stderr) + return False + + self.log.info(f"Pipedream finished successfully for dtag {dtag}") + + pipedream_summary = f"{out_dir}/pipedream_summary.json" + self.save_dataset_metadata( + str(pipedream_dir), + str(compound_dir), + str(out_dir), + CompoundCode, + smiles, + pipedream_command, + dtag, + ) + + try: + with open(pipedream_summary, "r") as f: + data = json.load(f) + reslo = ( + data.get("dataprocessing", {}) + .get("inputdata", {}) + .get("reslo", None) + ) + reshi = ( + data.get("dataprocessing", {}) + .get("inputdata", {}) + .get("reshigh", None) + ) + except Exception as e: + self.log.info(f"Cannot continue with pipedream postprocessing: {e}") + return True + + # Post-processing: Generate maps and run edstats + postrefine_dir = out_dir / f"postrefine-{CompoundCode}" + refine_mtz = postrefine_dir / "refine.mtz" + refine_pdb = postrefine_dir / "refine.pdb" + map_2fofc = postrefine_dir / "refine_2fofc.map" + map_fofc = postrefine_dir / "refine_fofc.map" + + try: + os.system(f"gemmi sf2map --sample 5 {str(refine_mtz)} {map_2fofc} 2>&1") + os.system(f"gemmi sf2map --sample 5 {str(refine_mtz)} {map_fofc} 2>&1") + except Exception as e: + self.log.debug(f"Cannot continue with pipedream postprocessing: {e}") + return True + + if reslo is None or reshi is None: + self.log.debug( + "Cannot continue with pipedream postprocessing: resolution range None" + ) + return True + + # Run edstats if both maps exist and resolution range is found + if not map_2fofc.exists() or not map_fofc.exists(): + self.log.debug( + "Cannot continue with pipedream postprocessing: maps not found" + ) + return True + + edstats_command = f"module load ccp4; edstats XYZIN {refine_pdb} MAPIN1 {map_2fofc} MAPIN2 {map_fofc} OUT {str(postrefine_dir / 'edstats.out')}" + stdin_text = f"RESLO={reslo}\nRESHI={reshi}\nEND\n" + + try: + result = subprocess.run( + edstats_command, + input=stdin_text, + text=True, + capture_output=True, + check=True, + shell=True, + ) + + except subprocess.CalledProcessError as e: + self.log.error(f"Edstats command: '{edstats_command}' failed") + self.log.info(e.stdout) + self.log.error(e.stderr) + return True + + self.log.info(f"Pipedream postprocessing finished successfully for dtag {dtag}") + return True + + def process_pdb_file(self, dimple_pdb: Path): + if dimple_pdb.exists(): + with open(dimple_pdb, "r", encoding="utf-8") as f: + lines = f.readlines() + + # Count removals by component type + original_count = len(lines) + components_to_remove = ["DMS", "EDO", "GOL", "SO4", "PO4", "PEG"] + removed_counts = dict.fromkeys(components_to_remove, 0) + + kept_lines = [] + for line in lines: + if any(res in line for res in components_to_remove): + # Count which component was found + for comp in components_to_remove: + if comp in line: + removed_counts[comp] += 1 + break + else: + kept_lines.append(line) + + # Write cleaned file + with open(dimple_pdb, "w", encoding="utf-8") as f: + f.writelines(kept_lines) + + removed_total = original_count - len(kept_lines) + if removed_total > 0: + component_summary = ", ".join( + [ + f"{comp}: {count}" + for comp, count in removed_counts.items() + if count > 0 + ] + ) + self.log.debug(f"Removed {removed_total} lines. ({component_summary})") + + else: + self.log.debug(f"Dimple pdb {dimple_pdb} does not exist") + return True + + def save_dataset_metadata( + self, + pipedream_dir, + input_dir, + output_dir, + compound_code, + smiles_string, + pipedream_cmd, + dtag, + ): + metadata = { + "Input_dir": input_dir, + "CompoundCode": compound_code, + "PipedreamDirectory": output_dir, + "ReportHTML": f"{output_dir}/report-{compound_code}/index.html", + "LigandReportHTML": f"{output_dir}/report-{compound_code}/ligand/index.html", + "ExpectedSummary": f"{output_dir}/pipedream_summary.json", + "PipedreamCommand": pipedream_cmd, + "ExpectedCIF": os.path.join(input_dir, f"{compound_code}.cif"), + "ExpectedPDB": os.path.join(input_dir, f"{compound_code}.pdb"), + "InputSMILES": smiles_string, + } + + output_yaml = {} + output_yaml[dtag] = metadata + json_file = f"{pipedream_dir}/Pipedream_output.json" + + # Acquire a lock + with portalocker.Lock(json_file, "a", timeout=5): + if os.path.exists(json_file) and os.path.getsize(json_file) > 0: + with open(json_file, "r", encoding="utf-8") as f: + try: + data = json.load(f) + except Exception as e: + self.log.debug( + f"Cannot continue with pipedream postprocessing: {e}" + ) + else: + data = {} + + data.update(output_yaml) + + with open(json_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + def update_data_source(self, db_dict, dtag, database_path): + sql = ( + "UPDATE mainTable SET " + + ", ".join([f"{k} = :{k}" for k in db_dict]) + + f" WHERE CrystalName = '{dtag}'" + ) + conn = sqlite3.connect(database_path) + # conn.execute("PRAGMA journal_mode=WAL;") + cursor = conn.cursor() + cursor.execute(sql, db_dict) + conn.commit() + + # Integrate back with XCE via datasource + # db_dict = {} + # db_dict["DimplePANDDAwasRun"] = True + # # db_dict["DimplePANDDAreject"] = False + # db_dict["DimplePANDDApath"] = str(auto_panddas_dir / "processed_datasets") + + # try: + # self.update_data_source(db_dict, dtag, database_path) + # self.log.info(f"Updated sqlite database for dataset {dtag}") + # except Exception as e: + # self.log.info(f"Could not update sqlite database for dataset {dtag}: {e}")