Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions biasanalyzer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,11 @@ def set_root_omop(self):
db = self.config['root_omop_cdm_database']['database']
db_url = f"postgresql://{user}:{password}@{host}:{port}/{db}"
self.omop_cdm_db = OMOPCDMDatabase(db_url)
self.bias_db = BiasDatabase(':memory:')
# load postgres extension in duckdb bias_db so that cohorts in duckdb can be joined
# with OMOP CDM tables in omop_cdm_db
self.bias_db.load_postgres_extension()
self.bias_db.omop_cdm_db_url = db_url

self.bias_db = BiasDatabase(':memory:', omop_db_url=db_url)
elif db_type == 'duckdb':
db_path = self.config['root_omop_cdm_database'].get('database', ":memory:")
self.omop_cdm_db = OMOPCDMDatabase(db_path)
self.bias_db = BiasDatabase(db_path)
self.bias_db.omop_cdm_db_url = db_path
self.bias_db = BiasDatabase(':memory:', omop_db_url=db_path)
else:
notify_users(f"Unsupported database type: {db_type}")

Expand Down
55 changes: 28 additions & 27 deletions biasanalyzer/cohort.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from sqlalchemy.exc import SQLAlchemyError
from functools import reduce
import duckdb
import pandas as pd
from datetime import datetime
from tqdm.auto import tqdm
Expand Down Expand Up @@ -122,31 +120,34 @@ def create_cohort(self, cohort_name: str, description: str, query_or_yaml_file:
try:
# Execute read-only query from OMOP CDM database
result = self.omop_db.execute_query(query)
# Create CohortDefinition
cohort_def = CohortDefinition(
name=cohort_name,
description=description,
created_date=datetime.now().date(),
creation_info=clean_string(query),
created_by=created_by
)
cohort_def_id = self.bias_db.create_cohort_definition(cohort_def, progress_obj=tqdm)
progress.update(1)

progress.set_postfix_str(stages[2])
# Store cohort_definition and cohort data into BiasDatabase
cohort_df = pd.DataFrame(result)
cohort_df['cohort_definition_id'] = cohort_def_id
cohort_df = cohort_df.rename(columns={"person_id": "subject_id"})
self.bias_db.create_cohort_in_bulk(cohort_df)
progress.update(1)

tqdm.write(f"Cohort {cohort_name} successfully created.")
return CohortData(cohort_id=cohort_def_id, bias_db=self.bias_db, omop_db=self.omop_db)
except duckdb.Error as e:
notify_users(f"Error executing query: {e}")
return None
except SQLAlchemyError as e:
if result:
# Create CohortDefinition
cohort_def = CohortDefinition(
name=cohort_name,
description=description,
created_date=datetime.now().date(),
creation_info=clean_string(query),
created_by=created_by
)
cohort_def_id = self.bias_db.create_cohort_definition(cohort_def, progress_obj=tqdm)
progress.update(1)

progress.set_postfix_str(stages[2])
# Store cohort_definition and cohort data into BiasDatabase
cohort_df = pd.DataFrame(result)
cohort_df['cohort_definition_id'] = cohort_def_id
cohort_df = cohort_df.rename(columns={"person_id": "subject_id"})
self.bias_db.create_cohort_in_bulk(cohort_df)
progress.update(1)

tqdm.write(f"Cohort {cohort_name} successfully created.")
return CohortData(cohort_id=cohort_def_id, bias_db=self.bias_db, omop_db=self.omop_db)
else:
progress.update(2)
notify_users(f"No cohort is created due to empty results being returned from query")
return None
except Exception as e:
progress.update(2)
notify_users(f"Error executing query: {e}")
if omop_session is not None:
omop_session.close()
Expand Down
7 changes: 6 additions & 1 deletion biasanalyzer/cohort_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ def build_query_cohort_creation(self, cohort_config: dict) -> str:
temporal_events=temporal_events
)

def build_concept_prevalence_query(self, concept_type: str, cid: int, filter_count: int, vocab: str) -> str:
def build_concept_prevalence_query(self, db_schema: str, omop_alias: str, concept_type: str, cid: int,
filter_count: int, vocab: str) -> str:
"""
Build a SQL query for concept prevalence statistics for a given domain and cohort.
:param db_schema: BiasDatabase database schema under which all tables are stored.
:param omop_alias: OMOP database alias attached to the BiasDataBase in-memory duckdb
:param concept_type: Domain from DOMAIN_MAPPING (e.g., 'condition_occurrence').
:param cid: Cohort definition ID.
:param filter_count: Minimum count threshold for concepts with 0 meaning no filtering
Expand All @@ -93,6 +96,8 @@ def build_concept_prevalence_query(self, concept_type: str, cid: int, filter_cou
# Load and render the template
template = self.env.get_template("cohort_concept_prevalence_query.sql.j2")
return template.render(
db_schema=db_schema,
omop=omop_alias,
table_name=DOMAIN_MAPPING[concept_type]["table"],
concept_id_column=DOMAIN_MAPPING[concept_type]["concept_id"],
start_date_column=DOMAIN_MAPPING[concept_type]["start_date"],
Expand Down
Loading