diff --git a/data-tool/flows/auth/__init__.py b/data-tool/flows/auth/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/data-tool/flows/auth/auth_affiliation_flow.py b/data-tool/flows/auth/auth_affiliation_flow.py new file mode 100644 index 0000000000..56a84f2148 --- /dev/null +++ b/data-tool/flows/auth/auth_affiliation_flow.py @@ -0,0 +1,282 @@ +import math +import os +from typing import Dict, List + +from prefect import flow +from prefect.context import get_run_context +from prefect.futures import wait +from prefect.states import Failed +from prefect.task_runners import ConcurrentTaskRunner +from sqlalchemy import text + +from common.extract_tracking_service import ExtractTrackingService, ProcessingStatuses +from common.init_utils import colin_extract_init, get_config +from common.query_utils import convert_result_set_to_dict + +from .auth_models import AuthCreatePlan, AuthDeletePlan, AuthSelectionMode +from .auth_queries import ( + get_auth_business_profiles_query, + get_auth_reservable_corps_query, + get_auth_reservable_count_query, +) +from .auth_tasks import get_auth_token, parse_accounts_csv, perform_auth_create_for_corp, perform_auth_delete_for_corp + +FLOW_NAME = 'auth-affiliation-flow' + + +def _get_max_workers() -> int: + try: + v = int(os.getenv('AUTH_MAX_WORKERS', '50')) + return v if v > 0 else 50 + except Exception: + return 50 + + +def _parse_selection_mode(config) -> AuthSelectionMode: + raw = (getattr(config, 'AUTH_SELECTION_MODE', 'MIGRATION_FILTER') or 'MIGRATION_FILTER').strip().upper() + try: + return AuthSelectionMode(raw) + except Exception as e: + raise ValueError(f'Unknown AUTH_SELECTION_MODE: {raw}') from e + + +def _fetch_profiles(colin_engine, corp_nums: List[str], suffix: str) -> Dict[str, dict]: + if not corp_nums: + return {} + sql = get_auth_business_profiles_query(corp_nums, suffix or '') + with colin_engine.connect() as conn: + rs = conn.execute(text(sql)) + rows = convert_result_set_to_dict(rs) + return {r['identifier']: r for r in rows} + + +@flow( + name='Auth-Affiliation-Flow', + log_prints=True, + persist_result=False, + task_runner=ConcurrentTaskRunner(max_workers=_get_max_workers()) +) +def auth_affiliation_flow(): + """ + Create OR delete affiliations (mutually exclusive for this run). + + - Create mode: uses AuthCreatePlan(create_affiliations=True) + - Delete mode: uses AuthDeletePlan(delete_affiliations=True) + + Selection excludes any corp already tracked in auth_processing for (corp_num, FLOW_NAME, environment). + """ + config = get_config() + colin_engine = colin_extract_init(config) + selection_mode = _parse_selection_mode(config) + + do_create = bool(getattr(config, 'AUTH_CREATE_AFFILIATIONS', False)) + do_delete = bool(getattr(config, 'AUTH_DELETE_AFFILIATIONS', False)) + + if do_create and do_delete: + raise ValueError('Invalid config: cannot both AUTH_CREATE_AFFILIATIONS and AUTH_DELETE_AFFILIATIONS in one run') + if not do_create and not do_delete: + raise ValueError('Nothing to do: set either AUTH_CREATE_AFFILIATIONS or AUTH_DELETE_AFFILIATIONS') + + create_plan = None + delete_plan = None + if do_create: + create_plan = AuthCreatePlan( + create_entity=bool(getattr(config, 'AUTH_CREATE_ENTITY', True)), + upsert_contact=False, + create_affiliations=True, + send_unaffiliated_invite=False, + fail_if_missing_email=False, + dry_run=bool(getattr(config, 'AUTH_DRY_RUN', False)), + ) + plan_desc = create_plan + else: + delete_plan = AuthDeletePlan( + delete_affiliations=True, + delete_entity=False, + delete_invites=False, + dry_run=bool(getattr(config, 'AUTH_DRY_RUN', False)), + ) + plan_desc = delete_plan + + # Count reservable + count_sql = get_auth_reservable_count_query( + flow_name=FLOW_NAME, + config=config, + selection_mode=selection_mode + ) + with colin_engine.connect() as conn: + total_reservable = int(conn.execute(text(count_sql)).scalar() or 0) + + if total_reservable <= 0: + print('No reservable corps found for this run.') + return + + if getattr(config, 'AUTH_BATCHES', 0) <= 0: + raise ValueError('AUTH_BATCHES must be explicitly set to a positive integer') + if getattr(config, 'AUTH_BATCH_SIZE', 0) <= 0: + raise ValueError('AUTH_BATCH_SIZE must be explicitly set to a positive integer') + + batch_size = config.AUTH_BATCH_SIZE + max_corps = min(total_reservable, config.AUTH_BATCHES * config.AUTH_BATCH_SIZE) + + flow_run_id = get_run_context().flow_run.id + + tracking = ExtractTrackingService( + config.DATA_LOAD_ENV, + colin_engine, + FLOW_NAME, + table_name='auth_processing', + statement_timeout_ms=getattr(config, 'RESERVE_STATEMENT_TIMEOUT_MS', None) + ) + + extra_insert_cols = ['account_ids'] + + base_query = get_auth_reservable_corps_query( + flow_name=FLOW_NAME, + config=config, + batch_size=max_corps, + selection_mode=selection_mode, + include_account_ids=True, + include_contact_email=False + ) + + reserved = tracking.reserve_for_flow( + base_query=base_query, + flow_run_id=flow_run_id, + extra_insert_cols=extra_insert_cols, + fallback_account_ids=config.AFFILIATE_ENTITY_ACCOUNT_IDS_CSV + ) + + if reserved <= 0: + print('No corps reserved (cohort may be exhausted or already reserved).') + return + + batches = min(math.ceil(reserved / batch_size), config.AUTH_BATCHES) + + print(f'👷 Auth affiliation mode: {"CREATE" if do_create else "DELETE"}') + print(f'👷 Plan: {plan_desc}') + print(f'👷 Reservable={total_reservable}, Reserved={reserved}, Batches={batches}, BatchSize={batch_size}') + print(f'👷 SelectionMode={selection_mode.value}') + + cnt = 0 + total_failed = 0 + total_completed = 0 + + while cnt < batches: + claimed = tracking.claim_batch( + flow_run_id, + batch_size, + extra_return_cols=extra_insert_cols, + as_dict=True + ) + if not claimed: + print('No more corps available to claim') + break + + corp_nums = [r['corp_num'] for r in claimed] + corp_accounts = {r['corp_num']: (r.get('account_ids') or None) for r in claimed} + + profiles = _fetch_profiles(colin_engine, corp_nums, getattr(config, 'CORP_NAME_SUFFIX', '') or '') if do_create else {} + + try: + token = get_auth_token(config) + except Exception as e: + err = f'Failed to obtain auth token: {repr(e)}' + print(f'❌ {err}') + for corp_num in corp_nums: + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error=err, + entity_action='FAILED' if (do_create and create_plan and create_plan.create_entity) else 'NOT_RUN', + contact_action='NOT_RUN', + affiliation_action='FAILED', + invite_action='NOT_RUN', + action_detail='token_error' + ) + return Failed(message=err) + + futures = [] + for corp_num in corp_nums: + accounts = parse_accounts_csv(corp_accounts.get(corp_num)) + + if do_create: + profile = profiles.get(corp_num) + if not profile: + total_failed += 1 + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error='Missing business profile for corp in COLIN extract', + entity_action='FAILED' if (create_plan and create_plan.create_entity) else 'NOT_RUN', + contact_action='NOT_RUN', + affiliation_action='FAILED', + invite_action='NOT_RUN', + action_detail='profile_missing' + ) + continue + + futures.append( + perform_auth_create_for_corp.submit( + config, + corp_num, + profile, + accounts, + create_plan, + token + ) + ) + else: + futures.append( + perform_auth_delete_for_corp.submit( + config, + corp_num, + accounts, + delete_plan, + token + ) + ) + + wait(futures) + + for f in futures: + res = f.result() + actions = [ + res.get('entity_action'), + res.get('contact_action'), + res.get('affiliation_action'), + res.get('invite_action'), + ] + failed = any(a == 'FAILED' for a in actions if a) + status = ProcessingStatuses.FAILED if failed else ProcessingStatuses.COMPLETED + + tracking.update_corp_status( + flow_run_id, + res['corp_num'], + status, + error=res.get('error'), + entity_action=res.get('entity_action'), + contact_action=res.get('contact_action'), + affiliation_action=res.get('affiliation_action'), + invite_action=res.get('invite_action'), + action_detail=res.get('action_detail') + ) + + if status == ProcessingStatuses.FAILED: + total_failed += 1 + else: + total_completed += 1 + + cnt += 1 + print(f'🌟 Complete round {cnt}/{batches}. Completed={total_completed}, Failed={total_failed}') + + if total_failed > 0: + return Failed(message=f'{total_failed} corps failed in {FLOW_NAME}.') + + print(f'🌰 {FLOW_NAME} complete. Completed={total_completed}, Failed={total_failed}') + + +if __name__ == '__main__': + auth_affiliation_flow() diff --git a/data-tool/flows/auth/auth_contact_flow.py b/data-tool/flows/auth/auth_contact_flow.py new file mode 100644 index 0000000000..2f118d9a2e --- /dev/null +++ b/data-tool/flows/auth/auth_contact_flow.py @@ -0,0 +1,235 @@ +import math +import os +from typing import Dict, List + +from prefect import flow +from prefect.context import get_run_context +from prefect.futures import wait +from prefect.states import Failed +from prefect.task_runners import ConcurrentTaskRunner +from sqlalchemy import text + +from common.extract_tracking_service import ExtractTrackingService, ProcessingStatuses +from common.init_utils import colin_extract_init, get_config +from common.query_utils import convert_result_set_to_dict + +from .auth_models import AuthCreatePlan, AuthSelectionMode +from .auth_queries import ( + get_auth_business_profiles_query, + get_auth_reservable_corps_query, + get_auth_reservable_count_query, +) +from .auth_tasks import get_auth_token, perform_auth_create_for_corp + +FLOW_NAME = 'auth-contact-flow' + + +def _get_max_workers() -> int: + try: + v = int(os.getenv('AUTH_MAX_WORKERS', '50')) + return v if v > 0 else 50 + except Exception: + return 50 + + +def _parse_selection_mode(config) -> AuthSelectionMode: + raw = (getattr(config, 'AUTH_SELECTION_MODE', 'MIGRATION_FILTER') or 'MIGRATION_FILTER').strip().upper() + try: + return AuthSelectionMode(raw) + except Exception as e: + raise ValueError(f'Unknown AUTH_SELECTION_MODE: {raw}') from e + + +def _fetch_profiles(colin_engine, corp_nums: List[str], suffix: str) -> Dict[str, dict]: + if not corp_nums: + return {} + sql = get_auth_business_profiles_query(corp_nums, suffix or '') + with colin_engine.connect() as conn: + rs = conn.execute(text(sql)) + rows = convert_result_set_to_dict(rs) + return {r['identifier']: r for r in rows} + + +@flow( + name='Auth-Contact-Flow', + log_prints=True, + persist_result=False, + task_runner=ConcurrentTaskRunner(max_workers=_get_max_workers()) +) +def auth_contact_flow(): + """ + Upsert entity contact email (no entity creation, no affiliations, no invite). + + Selection excludes any corp already tracked in auth_processing for (corp_num, FLOW_NAME, environment). + """ + config = get_config() + colin_engine = colin_extract_init(config) + selection_mode = _parse_selection_mode(config) + + plan = AuthCreatePlan( + create_entity=False, + upsert_contact=True, + create_affiliations=False, + send_unaffiliated_invite=False, + fail_if_missing_email=bool(getattr(config, 'AUTH_FAIL_IF_MISSING_EMAIL', False)), + dry_run=bool(getattr(config, 'AUTH_DRY_RUN', False)), + ) + + count_sql = get_auth_reservable_count_query( + flow_name=FLOW_NAME, + config=config, + selection_mode=selection_mode + ) + with colin_engine.connect() as conn: + total_reservable = int(conn.execute(text(count_sql)).scalar() or 0) + + if total_reservable <= 0: + print('No reservable corps found for this run.') + return + + if getattr(config, 'AUTH_BATCHES', 0) <= 0: + raise ValueError('AUTH_BATCHES must be explicitly set to a positive integer') + if getattr(config, 'AUTH_BATCH_SIZE', 0) <= 0: + raise ValueError('AUTH_BATCH_SIZE must be explicitly set to a positive integer') + + batch_size = config.AUTH_BATCH_SIZE + max_corps = min(total_reservable, config.AUTH_BATCHES * config.AUTH_BATCH_SIZE) + + flow_run_id = get_run_context().flow_run.id + + tracking = ExtractTrackingService( + config.DATA_LOAD_ENV, + colin_engine, + FLOW_NAME, + table_name='auth_processing', + statement_timeout_ms=getattr(config, 'RESERVE_STATEMENT_TIMEOUT_MS', None) + ) + + extra_insert_cols = ['contact_email'] + + base_query = get_auth_reservable_corps_query( + flow_name=FLOW_NAME, + config=config, + batch_size=max_corps, + selection_mode=selection_mode, + include_account_ids=False, + include_contact_email=True + ) + + reserved = tracking.reserve_for_flow( + base_query=base_query, + flow_run_id=flow_run_id, + extra_insert_cols=extra_insert_cols + ) + + if reserved <= 0: + print('No corps reserved (cohort may be exhausted or already reserved).') + return + + batches = min(math.ceil(reserved / batch_size), config.AUTH_BATCHES) + + print(f'👷 Auth contact plan: {plan}') + print(f'👷 Reservable={total_reservable}, Reserved={reserved}, Batches={batches}, BatchSize={batch_size}') + print(f'👷 SelectionMode={selection_mode.value}, DryRun={plan.dry_run}') + + cnt = 0 + total_failed = 0 + total_completed = 0 + + while cnt < batches: + claimed = tracking.claim_batch( + flow_run_id, + batch_size, + extra_return_cols=extra_insert_cols, + as_dict=True + ) + if not claimed: + print('No more corps available to claim') + break + + corp_nums = [r['corp_num'] for r in claimed] + profiles = _fetch_profiles(colin_engine, corp_nums, getattr(config, 'CORP_NAME_SUFFIX', '') or '') + + try: + token = get_auth_token(config) + except Exception as e: + err = f'Failed to obtain auth token: {repr(e)}' + print(f'❌ {err}') + for corp_num in corp_nums: + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error=err, + entity_action='NOT_RUN', + contact_action='FAILED', + affiliation_action='NOT_RUN', + invite_action='NOT_RUN', + action_detail='token_error' + ) + return Failed(message=err) + + futures = [] + for corp_num in corp_nums: + profile = profiles.get(corp_num) + if not profile: + total_failed += 1 + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error='Missing business profile for corp in COLIN extract', + entity_action='NOT_RUN', + contact_action='FAILED', + affiliation_action='NOT_RUN', + invite_action='NOT_RUN', + action_detail='profile_missing' + ) + continue + + futures.append( + perform_auth_create_for_corp.submit( + config, + corp_num, + profile, + [], + plan, + token + ) + ) + + wait(futures) + + for f in futures: + res = f.result() + failed = res.get('contact_action') == 'FAILED' + status = ProcessingStatuses.FAILED if failed else ProcessingStatuses.COMPLETED + + tracking.update_corp_status( + flow_run_id, + res['corp_num'], + status, + error=res.get('error'), + entity_action=res.get('entity_action'), + contact_action=res.get('contact_action'), + affiliation_action=res.get('affiliation_action'), + invite_action=res.get('invite_action'), + action_detail=res.get('action_detail') + ) + + if status == ProcessingStatuses.FAILED: + total_failed += 1 + else: + total_completed += 1 + + cnt += 1 + print(f'🌟 Complete round {cnt}/{batches}. Completed={total_completed}, Failed={total_failed}') + + if total_failed > 0: + return Failed(message=f'{total_failed} corps failed in {FLOW_NAME}.') + + print(f'🌰 {FLOW_NAME} complete. Completed={total_completed}, Failed={total_failed}') + + +if __name__ == '__main__': + auth_contact_flow() diff --git a/data-tool/flows/auth/auth_create_flow.py b/data-tool/flows/auth/auth_create_flow.py new file mode 100644 index 0000000000..6bcfe41e1b --- /dev/null +++ b/data-tool/flows/auth/auth_create_flow.py @@ -0,0 +1,260 @@ +import math +import os +from typing import Dict, List + +from prefect import flow +from prefect.context import get_run_context +from prefect.futures import wait +from prefect.states import Failed +from prefect.task_runners import ConcurrentTaskRunner +from sqlalchemy import text + +from common.extract_tracking_service import ExtractTrackingService, ProcessingStatuses +from common.init_utils import colin_extract_init, get_config +from common.query_utils import convert_result_set_to_dict + +from .auth_models import AuthCreatePlan, AuthSelectionMode +from .auth_queries import ( + get_auth_business_profiles_query, + get_auth_reservable_corps_query, + get_auth_reservable_count_query, +) +from .auth_tasks import get_auth_token, parse_accounts_csv, perform_auth_create_for_corp + +FLOW_NAME = 'auth-create-flow' + + +def _get_max_workers() -> int: + try: + v = int(os.getenv('AUTH_MAX_WORKERS', '50')) + return v if v > 0 else 50 + except Exception: + return 50 + + +def _parse_selection_mode(config) -> AuthSelectionMode: + raw = (getattr(config, 'AUTH_SELECTION_MODE', 'MIGRATION_FILTER') or 'MIGRATION_FILTER').strip().upper() + try: + return AuthSelectionMode(raw) + except Exception as e: + raise ValueError(f'Unknown AUTH_SELECTION_MODE: {raw}') from e + + +def _fetch_profiles(colin_engine, corp_nums: List[str], suffix: str) -> Dict[str, dict]: + if not corp_nums: + return {} + sql = get_auth_business_profiles_query(corp_nums, suffix or '') + with colin_engine.connect() as conn: + rs = conn.execute(text(sql)) + rows = convert_result_set_to_dict(rs) + return {r['identifier']: r for r in rows} + + +@flow( + name='Auth-Create-Flow', + log_prints=True, + persist_result=False, + task_runner=ConcurrentTaskRunner(max_workers=_get_max_workers()) +) +def auth_create_flow(): + """ + Create/ensure entities, optionally upsert contact, optionally create affiliations, + optionally send unaffiliated invite (mutually exclusive with affiliations). + + Selection excludes any corp already tracked in auth_processing for (corp_num, FLOW_NAME, environment). + """ + config = get_config() + colin_engine = colin_extract_init(config) + + selection_mode = _parse_selection_mode(config) + + plan = AuthCreatePlan( + create_entity=bool(getattr(config, 'AUTH_CREATE_ENTITY', True)), + upsert_contact=bool(getattr(config, 'AUTH_UPSERT_CONTACT', False)), + create_affiliations=bool(getattr(config, 'AUTH_CREATE_AFFILIATIONS', False)), + send_unaffiliated_invite=bool(getattr(config, 'AUTH_SEND_UNAFFILIATED_EMAIL', False)), + fail_if_missing_email=bool(getattr(config, 'AUTH_FAIL_IF_MISSING_EMAIL', False)), + dry_run=bool(getattr(config, 'AUTH_DRY_RUN', False)), + ) + + if plan.create_affiliations and plan.send_unaffiliated_invite: + raise ValueError('Invalid plan: cannot both create affiliations and send unaffiliated invite') + + # Count reservable + count_sql = get_auth_reservable_count_query( + flow_name=FLOW_NAME, + config=config, + selection_mode=selection_mode + ) + with colin_engine.connect() as conn: + total_reservable = int(conn.execute(text(count_sql)).scalar() or 0) + + if total_reservable <= 0: + print('No reservable corps found for this run.') + return + + # Throughput config + if getattr(config, 'AUTH_BATCHES', 0) <= 0: + raise ValueError('AUTH_BATCHES must be explicitly set to a positive integer') + if getattr(config, 'AUTH_BATCH_SIZE', 0) <= 0: + raise ValueError('AUTH_BATCH_SIZE must be explicitly set to a positive integer') + + batch_size = config.AUTH_BATCH_SIZE + max_corps = min(total_reservable, config.AUTH_BATCHES * config.AUTH_BATCH_SIZE) + + flow_run_id = get_run_context().flow_run.id + + tracking = ExtractTrackingService( + config.DATA_LOAD_ENV, + colin_engine, + FLOW_NAME, + table_name='auth_processing', + statement_timeout_ms=getattr(config, 'RESERVE_STATEMENT_TIMEOUT_MS', None) + ) + + include_account_ids = bool(plan.create_affiliations) + include_contact_email = bool(plan.upsert_contact or plan.send_unaffiliated_invite or plan.fail_if_missing_email) + + extra_insert_cols: List[str] = [] + if include_account_ids: + extra_insert_cols.append('account_ids') + if include_contact_email: + extra_insert_cols.append('contact_email') + + base_query = get_auth_reservable_corps_query( + flow_name=FLOW_NAME, + config=config, + batch_size=max_corps, + selection_mode=selection_mode, + include_account_ids=include_account_ids, + include_contact_email=include_contact_email + ) + + fallback_accounts = config.AFFILIATE_ENTITY_ACCOUNT_IDS_CSV if include_account_ids else None + reserved = tracking.reserve_for_flow( + base_query=base_query, + flow_run_id=flow_run_id, + extra_insert_cols=extra_insert_cols or None, + fallback_account_ids=fallback_accounts + ) + + if reserved <= 0: + print('No corps reserved (cohort may be exhausted or already reserved).') + return + + batches = min(math.ceil(reserved / batch_size), config.AUTH_BATCHES) + + print(f'👷 Auth create plan: {plan}') + print(f'👷 Reservable={total_reservable}, Reserved={reserved}, Batches={batches}, BatchSize={batch_size}') + print(f'👷 SelectionMode={selection_mode.value}, DryRun={plan.dry_run}') + + cnt = 0 + total_failed = 0 + total_completed = 0 + + while cnt < batches: + claimed = tracking.claim_batch( + flow_run_id, + batch_size, + extra_return_cols=extra_insert_cols or None, + as_dict=True + ) + if not claimed: + print('No more corps available to claim') + break + + corp_nums = [r['corp_num'] for r in claimed] + corp_accounts = {r['corp_num']: (r.get('account_ids') or None) for r in claimed} if include_account_ids else {} + + profiles = _fetch_profiles(colin_engine, corp_nums, getattr(config, 'CORP_NAME_SUFFIX', '') or '') + + try: + token = get_auth_token(config) + except Exception as e: + err = f'Failed to obtain auth token: {repr(e)}' + print(f'❌ {err}') + for corp_num in corp_nums: + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error=err, + entity_action='FAILED' if plan.create_entity else 'NOT_RUN', + contact_action='FAILED' if plan.upsert_contact else 'NOT_RUN', + affiliation_action='FAILED' if plan.create_affiliations else 'NOT_RUN', + invite_action='FAILED' if plan.send_unaffiliated_invite else 'NOT_RUN', + action_detail='token_error' + ) + return Failed(message=err) + + futures = [] + for corp_num in corp_nums: + profile = profiles.get(corp_num) + if not profile: + total_failed += 1 + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error='Missing business profile for corp in COLIN extract', + entity_action='FAILED' if plan.create_entity else 'NOT_RUN', + contact_action='FAILED' if plan.upsert_contact else 'NOT_RUN', + affiliation_action='FAILED' if plan.create_affiliations else 'NOT_RUN', + invite_action='FAILED' if plan.send_unaffiliated_invite else 'NOT_RUN', + action_detail='profile_missing' + ) + continue + + accounts = parse_accounts_csv(corp_accounts.get(corp_num)) if include_account_ids else [] + futures.append( + perform_auth_create_for_corp.submit( + config, + corp_num, + profile, + accounts, + plan, + token + ) + ) + + wait(futures) + + for f in futures: + res = f.result() + actions = [ + res.get('entity_action'), + res.get('contact_action'), + res.get('affiliation_action'), + res.get('invite_action'), + ] + failed = any(a == 'FAILED' for a in actions if a) + status = ProcessingStatuses.FAILED if failed else ProcessingStatuses.COMPLETED + + tracking.update_corp_status( + flow_run_id, + res['corp_num'], + status, + error=res.get('error'), + entity_action=res.get('entity_action'), + contact_action=res.get('contact_action'), + affiliation_action=res.get('affiliation_action'), + invite_action=res.get('invite_action'), + action_detail=res.get('action_detail') + ) + + if status == ProcessingStatuses.FAILED: + total_failed += 1 + else: + total_completed += 1 + + cnt += 1 + print(f'🌟 Complete round {cnt}/{batches}. Completed={total_completed}, Failed={total_failed}') + + if total_failed > 0: + return Failed(message=f'{total_failed} corps failed in {FLOW_NAME}.') + + print(f'🌰 {FLOW_NAME} complete. Completed={total_completed}, Failed={total_failed}') + + +if __name__ == '__main__': + auth_create_flow() diff --git a/data-tool/flows/auth/auth_delete_flow.py b/data-tool/flows/auth/auth_delete_flow.py new file mode 100644 index 0000000000..3632cf5307 --- /dev/null +++ b/data-tool/flows/auth/auth_delete_flow.py @@ -0,0 +1,229 @@ +import math +import os +from typing import List + +from prefect import flow +from prefect.context import get_run_context +from prefect.futures import wait +from prefect.states import Failed +from prefect.task_runners import ConcurrentTaskRunner +from sqlalchemy import text + +from common.extract_tracking_service import ExtractTrackingService, ProcessingStatuses +from common.init_utils import colin_extract_init, get_config + +from .auth_models import AuthDeletePlan, AuthSelectionMode +from .auth_queries import ( + get_auth_reservable_corps_query, + get_auth_reservable_count_query, +) +from .auth_tasks import get_auth_token, parse_accounts_csv, perform_auth_delete_for_corp + +FLOW_NAME = 'auth-delete-flow' + + +def _get_max_workers() -> int: + try: + v = int(os.getenv('AUTH_MAX_WORKERS', '50')) + return v if v > 0 else 50 + except Exception: + return 50 + + +def _parse_selection_mode(config) -> AuthSelectionMode: + raw = (getattr(config, 'AUTH_SELECTION_MODE', 'MIGRATION_FILTER') or 'MIGRATION_FILTER').strip().upper() + try: + return AuthSelectionMode(raw) + except Exception as e: + raise ValueError(f'Unknown AUTH_SELECTION_MODE: {raw}') from e + + +@flow( + name='Auth-Delete-Flow', + log_prints=True, + persist_result=False, + task_runner=ConcurrentTaskRunner(max_workers=_get_max_workers()) +) +def auth_delete_flow(): + """ + Delete affiliations (optional) and delete entity (optional). + + SAFETY: If AUTH_REQUIRE_CONFIRMATION is True, AUTH_CONFIRMATION_TOKEN must be set (non-empty), + or the flow will fail fast before reserving/claiming. + + Selection excludes any corp already tracked in auth_processing for (corp_num, FLOW_NAME, environment). + """ + config = get_config() + colin_engine = colin_extract_init(config) + + # Safety gate + if bool(getattr(config, 'AUTH_REQUIRE_CONFIRMATION', False)): + if not (getattr(config, 'AUTH_CONFIRMATION_TOKEN', '') or '').strip(): + raise ValueError('AUTH_REQUIRE_CONFIRMATION is True but AUTH_CONFIRMATION_TOKEN is not set.') + print('🛑 Delete confirmation token is present (value not displayed). Proceeding.') + + selection_mode = _parse_selection_mode(config) + + plan = AuthDeletePlan( + delete_affiliations=bool(getattr(config, 'AUTH_DELETE_AFFILIATIONS', False)), + delete_entity=bool(getattr(config, 'AUTH_DELETE_ENTITY', False)), + delete_invites=bool(getattr(config, 'AUTH_DELETE_INVITES', False)), + dry_run=bool(getattr(config, 'AUTH_DRY_RUN', False)), + ) + + # Count reservable + count_sql = get_auth_reservable_count_query( + flow_name=FLOW_NAME, + config=config, + selection_mode=selection_mode + ) + with colin_engine.connect() as conn: + total_reservable = int(conn.execute(text(count_sql)).scalar() or 0) + + if total_reservable <= 0: + print('No reservable corps found for this run.') + return + + # Throughput config + if getattr(config, 'AUTH_BATCHES', 0) <= 0: + raise ValueError('AUTH_BATCHES must be explicitly set to a positive integer') + if getattr(config, 'AUTH_BATCH_SIZE', 0) <= 0: + raise ValueError('AUTH_BATCH_SIZE must be explicitly set to a positive integer') + + batch_size = config.AUTH_BATCH_SIZE + max_corps = min(total_reservable, config.AUTH_BATCHES * config.AUTH_BATCH_SIZE) + + flow_run_id = get_run_context().flow_run.id + + tracking = ExtractTrackingService( + config.DATA_LOAD_ENV, + colin_engine, + FLOW_NAME, + table_name='auth_processing', + statement_timeout_ms=getattr(config, 'RESERVE_STATEMENT_TIMEOUT_MS', None) + ) + + include_account_ids = bool(plan.delete_affiliations) + + extra_insert_cols: List[str] = [] + if include_account_ids: + extra_insert_cols.append('account_ids') + + base_query = get_auth_reservable_corps_query( + flow_name=FLOW_NAME, + config=config, + batch_size=max_corps, + selection_mode=selection_mode, + include_account_ids=include_account_ids, + include_contact_email=False + ) + + fallback_accounts = config.AFFILIATE_ENTITY_ACCOUNT_IDS_CSV if include_account_ids else None + reserved = tracking.reserve_for_flow( + base_query=base_query, + flow_run_id=flow_run_id, + extra_insert_cols=extra_insert_cols or None, + fallback_account_ids=fallback_accounts + ) + + if reserved <= 0: + print('No corps reserved (cohort may be exhausted or already reserved).') + return + + batches = min(math.ceil(reserved / batch_size), config.AUTH_BATCHES) + + print(f'👷 Auth delete plan: {plan}') + print(f'👷 Reservable={total_reservable}, Reserved={reserved}, Batches={batches}, BatchSize={batch_size}') + print(f'👷 SelectionMode={selection_mode.value}, DryRun={plan.dry_run}') + + cnt = 0 + total_failed = 0 + total_completed = 0 + + while cnt < batches: + claimed = tracking.claim_batch( + flow_run_id, + batch_size, + extra_return_cols=extra_insert_cols or None, + as_dict=True + ) + if not claimed: + print('No more corps available to claim') + break + + corp_nums = [r['corp_num'] for r in claimed] + corp_accounts = {r['corp_num']: (r.get('account_ids') or None) for r in claimed} if include_account_ids else {} + + try: + token = get_auth_token(config) + except Exception as e: + err = f'Failed to obtain auth token: {repr(e)}' + print(f'❌ {err}') + for corp_num in corp_nums: + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error=err, + entity_action='FAILED' if plan.delete_entity else 'NOT_RUN', + contact_action='NOT_RUN', + affiliation_action='FAILED' if plan.delete_affiliations else 'NOT_RUN', + invite_action='FAILED' if plan.delete_invites else 'NOT_RUN', + action_detail='token_error' + ) + return Failed(message=err) + + futures = [] + for corp_num in corp_nums: + accounts = parse_accounts_csv(corp_accounts.get(corp_num)) if include_account_ids else [] + futures.append( + perform_auth_delete_for_corp.submit( + config, + corp_num, + accounts, + plan, + token + ) + ) + + wait(futures) + + for f in futures: + res = f.result() + actions = [ + res.get('entity_action'), + res.get('contact_action'), + res.get('affiliation_action'), + res.get('invite_action'), + ] + failed = any(a == 'FAILED' for a in actions if a) + status = ProcessingStatuses.FAILED if failed else ProcessingStatuses.COMPLETED + + tracking.update_corp_status( + flow_run_id, + res['corp_num'], + status, + error=res.get('error'), + entity_action=res.get('entity_action'), + contact_action=res.get('contact_action'), + affiliation_action=res.get('affiliation_action'), + invite_action=res.get('invite_action'), + action_detail=res.get('action_detail') + ) + + if status == ProcessingStatuses.FAILED: + total_failed += 1 + else: + total_completed += 1 + + cnt += 1 + print(f'🌟 Complete round {cnt}/{batches}. Completed={total_completed}, Failed={total_failed}') + + if total_failed > 0: + return Failed(message=f'{total_failed} corps failed in {FLOW_NAME}.') + + print(f'🌰 {FLOW_NAME} complete. Completed={total_completed}, Failed={total_failed}') + + +if __name__ == '__main__': + auth_delete_flow() diff --git a/data-tool/flows/auth/auth_invite_flow.py b/data-tool/flows/auth/auth_invite_flow.py new file mode 100644 index 0000000000..8d1f9e4156 --- /dev/null +++ b/data-tool/flows/auth/auth_invite_flow.py @@ -0,0 +1,242 @@ +import math +import os +from typing import Dict, List + +from prefect import flow +from prefect.context import get_run_context +from prefect.futures import wait +from prefect.states import Failed +from prefect.task_runners import ConcurrentTaskRunner +from sqlalchemy import text + +from common.extract_tracking_service import ExtractTrackingService, ProcessingStatuses +from common.init_utils import colin_extract_init, get_config +from common.query_utils import convert_result_set_to_dict + +from .auth_models import AuthCreatePlan, AuthSelectionMode +from .auth_queries import ( + get_auth_business_profiles_query, + get_auth_reservable_corps_query, + get_auth_reservable_count_query, +) +from .auth_tasks import get_auth_token, perform_auth_create_for_corp + +FLOW_NAME = 'auth-invite-flow' + + +def _get_max_workers() -> int: + try: + v = int(os.getenv('AUTH_MAX_WORKERS', '50')) + return v if v > 0 else 50 + except Exception: + return 50 + + +def _parse_selection_mode(config) -> AuthSelectionMode: + raw = (getattr(config, 'AUTH_SELECTION_MODE', 'MIGRATION_FILTER') or 'MIGRATION_FILTER').strip().upper() + try: + return AuthSelectionMode(raw) + except Exception as e: + raise ValueError(f'Unknown AUTH_SELECTION_MODE: {raw}') from e + + +def _fetch_profiles(colin_engine, corp_nums: List[str], suffix: str) -> Dict[str, dict]: + if not corp_nums: + return {} + sql = get_auth_business_profiles_query(corp_nums, suffix or '') + with colin_engine.connect() as conn: + rs = conn.execute(text(sql)) + rows = convert_result_set_to_dict(rs) + return {r['identifier']: r for r in rows} + + +@flow( + name='Auth-Invite-Flow', + log_prints=True, + persist_result=False, + task_runner=ConcurrentTaskRunner(max_workers=_get_max_workers()) +) +def auth_invite_flow(): + """ + Send unaffiliated invite for an entity. + + Optionally upserts contact email first (AUTH_UPSERT_CONTACT). + (Affiliations are mutually exclusive with invites; this flow does invites only.) + + Selection excludes any corp already tracked in auth_processing for (corp_num, FLOW_NAME, environment). + """ + config = get_config() + colin_engine = colin_extract_init(config) + selection_mode = _parse_selection_mode(config) + + plan = AuthCreatePlan( + create_entity=False, + upsert_contact=bool(getattr(config, 'AUTH_UPSERT_CONTACT', False)), + create_affiliations=False, + send_unaffiliated_invite=True, + fail_if_missing_email=bool(getattr(config, 'AUTH_FAIL_IF_MISSING_EMAIL', False)), + dry_run=bool(getattr(config, 'AUTH_DRY_RUN', False)), + ) + + count_sql = get_auth_reservable_count_query( + flow_name=FLOW_NAME, + config=config, + selection_mode=selection_mode + ) + with colin_engine.connect() as conn: + total_reservable = int(conn.execute(text(count_sql)).scalar() or 0) + + if total_reservable <= 0: + print('No reservable corps found for this run.') + return + + if getattr(config, 'AUTH_BATCHES', 0) <= 0: + raise ValueError('AUTH_BATCHES must be explicitly set to a positive integer') + if getattr(config, 'AUTH_BATCH_SIZE', 0) <= 0: + raise ValueError('AUTH_BATCH_SIZE must be explicitly set to a positive integer') + + batch_size = config.AUTH_BATCH_SIZE + max_corps = min(total_reservable, config.AUTH_BATCHES * config.AUTH_BATCH_SIZE) + + flow_run_id = get_run_context().flow_run.id + + tracking = ExtractTrackingService( + config.DATA_LOAD_ENV, + colin_engine, + FLOW_NAME, + table_name='auth_processing', + statement_timeout_ms=getattr(config, 'RESERVE_STATEMENT_TIMEOUT_MS', None) + ) + + extra_insert_cols = ['contact_email'] + + base_query = get_auth_reservable_corps_query( + flow_name=FLOW_NAME, + config=config, + batch_size=max_corps, + selection_mode=selection_mode, + include_account_ids=False, + include_contact_email=True + ) + + reserved = tracking.reserve_for_flow( + base_query=base_query, + flow_run_id=flow_run_id, + extra_insert_cols=extra_insert_cols + ) + + if reserved <= 0: + print('No corps reserved (cohort may be exhausted or already reserved).') + return + + batches = min(math.ceil(reserved / batch_size), config.AUTH_BATCHES) + + print(f'👷 Auth invite plan: {plan}') + print(f'👷 Reservable={total_reservable}, Reserved={reserved}, Batches={batches}, BatchSize={batch_size}') + print(f'👷 SelectionMode={selection_mode.value}, DryRun={plan.dry_run}') + + cnt = 0 + total_failed = 0 + total_completed = 0 + + while cnt < batches: + claimed = tracking.claim_batch( + flow_run_id, + batch_size, + extra_return_cols=extra_insert_cols, + as_dict=True + ) + if not claimed: + print('No more corps available to claim') + break + + corp_nums = [r['corp_num'] for r in claimed] + profiles = _fetch_profiles(colin_engine, corp_nums, getattr(config, 'CORP_NAME_SUFFIX', '') or '') + + try: + token = get_auth_token(config) + except Exception as e: + err = f'Failed to obtain auth token: {repr(e)}' + print(f'❌ {err}') + for corp_num in corp_nums: + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error=err, + entity_action='NOT_RUN', + contact_action='FAILED' if plan.upsert_contact else 'NOT_RUN', + affiliation_action='NOT_RUN', + invite_action='FAILED', + action_detail='token_error' + ) + return Failed(message=err) + + futures = [] + for corp_num in corp_nums: + profile = profiles.get(corp_num) + if not profile: + total_failed += 1 + tracking.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error='Missing business profile for corp in COLIN extract', + entity_action='NOT_RUN', + contact_action='FAILED' if plan.upsert_contact else 'NOT_RUN', + affiliation_action='NOT_RUN', + invite_action='FAILED', + action_detail='profile_missing' + ) + continue + + futures.append( + perform_auth_create_for_corp.submit( + config, + corp_num, + profile, + [], + plan, + token + ) + ) + + wait(futures) + + for f in futures: + res = f.result() + actions = [ + res.get('contact_action'), + res.get('invite_action'), + ] + failed = any(a == 'FAILED' for a in actions if a) + status = ProcessingStatuses.FAILED if failed else ProcessingStatuses.COMPLETED + + tracking.update_corp_status( + flow_run_id, + res['corp_num'], + status, + error=res.get('error'), + entity_action=res.get('entity_action'), + contact_action=res.get('contact_action'), + affiliation_action=res.get('affiliation_action'), + invite_action=res.get('invite_action'), + action_detail=res.get('action_detail') + ) + + if status == ProcessingStatuses.FAILED: + total_failed += 1 + else: + total_completed += 1 + + cnt += 1 + print(f'🌟 Complete round {cnt}/{batches}. Completed={total_completed}, Failed={total_failed}') + + if total_failed > 0: + return Failed(message=f'{total_failed} corps failed in {FLOW_NAME}.') + + print(f'🌰 {FLOW_NAME} complete. Completed={total_completed}, Failed={total_failed}') + + +if __name__ == '__main__': + auth_invite_flow() diff --git a/data-tool/flows/auth/auth_models.py b/data-tool/flows/auth/auth_models.py new file mode 100644 index 0000000000..673d6afdff --- /dev/null +++ b/data-tool/flows/auth/auth_models.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass +from enum import Enum + + +class AuthSelectionMode(str, Enum): + """How an auth flow selects candidate corps to process.""" + MANUAL = "MANUAL" + MIGRATION_FILTER = "MIGRATION_FILTER" + CORP_PROCESSING = "CORP_PROCESSING" + + +class AuthComponentStatus(str, Enum): + """Per-component outcome stored in auth_processing.""" + SUCCESS = "SUCCESS" + SKIPPED = "SKIPPED" + FAILED = "FAILED" + NOT_RUN = "NOT_RUN" + + +@dataclass(frozen=True) +class AuthCreatePlan: + """Immutable plan for create-like auth flows.""" + create_entity: bool = True + upsert_contact: bool = False + create_affiliations: bool = False + send_unaffiliated_invite: bool = False + fail_if_missing_email: bool = False + dry_run: bool = False + + +@dataclass(frozen=True) +class AuthDeletePlan: + """Immutable plan for delete-like auth flows.""" + delete_affiliations: bool = False + delete_entity: bool = False + delete_invites: bool = False # only if supported by API + dry_run: bool = False diff --git a/data-tool/flows/auth/auth_queries.py b/data-tool/flows/auth/auth_queries.py new file mode 100644 index 0000000000..b6d81f4d0e --- /dev/null +++ b/data-tool/flows/auth/auth_queries.py @@ -0,0 +1,317 @@ +from __future__ import annotations + +from typing import List, Sequence + +from .auth_models import AuthSelectionMode + +# Match the existing tombstone corp type cohort by default. +CORP_TYPE_FILTER = "('BC', 'C', 'ULC', 'CUL', 'CC', 'CCC', 'QA', 'QB', 'QC', 'QD', 'QE')" + + +def _escape_sql_literal(val: str) -> str: + """Escape a value for safe embedding in a SQL single-quoted literal.""" + return (val or "").replace("'", "''") + + +def _quote_list(values: Sequence[str]) -> str: + """Quote/escape a list of values for SQL IN (...) lists.""" + return ", ".join([f"'{_escape_sql_literal(v)}'" for v in values]) + + +def _parse_corp_nums_csv(csv_val: str | None) -> List[str]: + if not csv_val: + return [] + parts: List[str] = [] + for tok in str(csv_val).split(','): + t = tok.strip().upper() + if t: + parts.append(t) + # Deduplicate while preserving order + seen = set() + out: List[str] = [] + for t in parts: + if t not in seen: + seen.add(t) + out.append(t) + return out + + +def get_auth_reservable_corps_query( + *, + flow_name: str, + config, + batch_size: int, + selection_mode: AuthSelectionMode, + include_account_ids: bool = False, + include_contact_email: bool = False, +) -> str: + """ + Build SQL to select corps eligible to be reserved for an auth flow. + + Contract: Must SELECT at least: + - corp_num + - corp_type_cd + - mig_batch_id + + Optionally selects: + - account_ids (CSV string) + - contact_email + + IMPORTANT: Always excludes corps already present in auth_processing for the same + (corp_num, flow_name, environment). + """ + environment = getattr(config, 'DATA_LOAD_ENV', '') + mig_group_ids = getattr(config, 'MIG_GROUP_IDS', None) + mig_batch_ids = getattr(config, 'MIG_BATCH_IDS', None) + source_flow = getattr(config, 'AUTH_SOURCE_FLOW_NAME', 'tombstone-flow') + + # Optional select columns + account_map_cte = "" + account_join = "" + account_select = "" + if include_account_ids: + if selection_mode == AuthSelectionMode.MIGRATION_FILTER: + # account_map is expensive; only include when requested. + account_map_cte = f""" + WITH account_map AS ( + SELECT mca.corp_num, + array_to_string(array_agg(DISTINCT mca.account_id ORDER BY mca.account_id), ',') AS account_ids + FROM mig_corp_account mca + JOIN mig_batch b2 ON b2.id = mca.mig_batch_id + WHERE mca.target_environment = '{_escape_sql_literal(environment)}' + {f" AND b2.id IN ({mig_batch_ids})" if mig_batch_ids else ""} + {f" AND b2.mig_group_id IN ({mig_group_ids})" if mig_group_ids else ""} + GROUP BY mca.corp_num + ) + """ + account_join = "LEFT JOIN account_map am ON am.corp_num = c.corp_num" + account_select = ", COALESCE(am.account_ids, NULL::varchar(100)) AS account_ids" + elif selection_mode == AuthSelectionMode.CORP_PROCESSING: + account_select = ", cp.account_ids AS account_ids" + else: + # MANUAL + account_select = ", NULL::varchar(100) AS account_ids" + + contact_select = ", c.admin_email AS contact_email" if include_contact_email else "" + + ap_join = f""" + LEFT JOIN auth_processing ap + ON ap.corp_num = c.corp_num + AND ap.flow_name = '{_escape_sql_literal(flow_name)}' + AND ap.environment = '{_escape_sql_literal(environment)}' + """ + + # MIG filters for MIGRATION_FILTER mode + mig_extra_where = "" + if selection_mode == AuthSelectionMode.MIGRATION_FILTER: + if mig_batch_ids: + mig_extra_where += f" AND b.id IN ({mig_batch_ids})" + if mig_group_ids: + mig_extra_where += f" AND g.id IN ({mig_group_ids})" + + if selection_mode == AuthSelectionMode.MIGRATION_FILTER: + query = f""" + {account_map_cte} + SELECT + c.corp_num, + c.corp_type_cd, + b.id AS mig_batch_id + {account_select} + {contact_select} + FROM mig_corp_batch mcb + JOIN mig_batch b ON b.id = mcb.mig_batch_id + JOIN mig_group g ON g.id = b.mig_group_id + JOIN corporation c ON c.corp_num = mcb.corp_num + JOIN corp_state cs + ON cs.corp_num = c.corp_num + AND cs.end_event_id IS NULL + {account_join} + {ap_join} + WHERE 1=1 + {mig_extra_where} + AND c.corp_type_cd IN {CORP_TYPE_FILTER} + AND ap.corp_num IS NULL + LIMIT {int(batch_size)} + """ + return query + + if selection_mode == AuthSelectionMode.CORP_PROCESSING: + query = f""" + SELECT + cp.corp_num, + c.corp_type_cd, + cp.mig_batch_id AS mig_batch_id + {account_select} + {contact_select} + FROM corp_processing cp + JOIN corporation c ON c.corp_num = cp.corp_num + JOIN corp_state cs + ON cs.corp_num = c.corp_num + AND cs.end_event_id IS NULL + {ap_join} + WHERE 1=1 + AND cp.flow_name = '{_escape_sql_literal(source_flow)}' + AND cp.environment = '{_escape_sql_literal(environment)}' + AND cp.processed_status IN ('COMPLETED', 'PARTIAL') + AND c.corp_type_cd IN {CORP_TYPE_FILTER} + AND ap.corp_num IS NULL + LIMIT {int(batch_size)} + """ + return query + + # MANUAL + corp_nums = _parse_corp_nums_csv(getattr(config, 'AUTH_CORP_NUMS', '')) + corp_filter = f"AND c.corp_num IN ({_quote_list(corp_nums)})" if corp_nums else "AND 1=0" + + query = f""" + SELECT + c.corp_num, + c.corp_type_cd, + NULL::integer AS mig_batch_id + {account_select} + {contact_select} + FROM corporation c + JOIN corp_state cs + ON cs.corp_num = c.corp_num + AND cs.end_event_id IS NULL + {ap_join} + WHERE 1=1 + {corp_filter} + AND c.corp_type_cd IN {CORP_TYPE_FILTER} + AND ap.corp_num IS NULL + LIMIT {int(batch_size)} + """ + return query + + +def get_auth_reservable_count_query( + *, + flow_name: str, + config, + selection_mode: AuthSelectionMode, +) -> str: + """ + Count corps eligible to be reserved for this auth flow. + + Must match get_auth_reservable_corps_query(...) filters, including: + - selection_mode logic + - MIG filters (when enabled) + - corp type filter + - corp_state current row (end_event_id IS NULL) + - exclusion of existing auth_processing rows for (corp_num, flow_name, environment) + """ + environment = getattr(config, 'DATA_LOAD_ENV', '') + mig_group_ids = getattr(config, 'MIG_GROUP_IDS', None) + mig_batch_ids = getattr(config, 'MIG_BATCH_IDS', None) + source_flow = getattr(config, 'AUTH_SOURCE_FLOW_NAME', 'tombstone-flow') + + ap_join = f""" + LEFT JOIN auth_processing ap + ON ap.corp_num = c.corp_num + AND ap.flow_name = '{_escape_sql_literal(flow_name)}' + AND ap.environment = '{_escape_sql_literal(environment)}' + """ + + mig_extra_where = "" + if selection_mode == AuthSelectionMode.MIGRATION_FILTER: + if mig_batch_ids: + mig_extra_where += f" AND b.id IN ({mig_batch_ids})" + if mig_group_ids: + mig_extra_where += f" AND g.id IN ({mig_group_ids})" + + if selection_mode == AuthSelectionMode.MIGRATION_FILTER: + return f""" + SELECT count(*) + FROM mig_corp_batch mcb + JOIN mig_batch b ON b.id = mcb.mig_batch_id + JOIN mig_group g ON g.id = b.mig_group_id + JOIN corporation c ON c.corp_num = mcb.corp_num + JOIN corp_state cs + ON cs.corp_num = c.corp_num + AND cs.end_event_id IS NULL + {ap_join} + WHERE 1=1 + {mig_extra_where} + AND c.corp_type_cd IN {CORP_TYPE_FILTER} + AND ap.corp_num IS NULL + """ + + if selection_mode == AuthSelectionMode.CORP_PROCESSING: + return f""" + SELECT count(*) + FROM corp_processing cp + JOIN corporation c ON c.corp_num = cp.corp_num + JOIN corp_state cs + ON cs.corp_num = c.corp_num + AND cs.end_event_id IS NULL + {ap_join} + WHERE 1=1 + AND cp.flow_name = '{_escape_sql_literal(source_flow)}' + AND cp.environment = '{_escape_sql_literal(environment)}' + AND cp.processed_status IN ('COMPLETED', 'PARTIAL') + AND c.corp_type_cd IN {CORP_TYPE_FILTER} + AND ap.corp_num IS NULL + """ + + corp_nums = _parse_corp_nums_csv(getattr(config, 'AUTH_CORP_NUMS', '')) + corp_filter = f"AND c.corp_num IN ({_quote_list(corp_nums)})" if corp_nums else "AND 1=0" + return f""" + SELECT count(*) + FROM corporation c + JOIN corp_state cs + ON cs.corp_num = c.corp_num + AND cs.end_event_id IS NULL + {ap_join} + WHERE 1=1 + {corp_filter} + AND c.corp_type_cd IN {CORP_TYPE_FILTER} + AND ap.corp_num IS NULL + """ + + +def get_auth_business_profiles_query(corp_nums: List[str], suffix: str) -> str: + """ + Batch-friendly business profile lookup for Auth API actions. + + Returns one row per corp_num with: + - identifier (corp_num) + - legal_name (current CO/NB corp_name + suffix) + - legal_type (corp_type_cd) + - admin_email (corporation.admin_email) + - pass_code (corporation.corp_password) + + NOTE: Callers must never log or persist pass_code. + """ + if not corp_nums: + return """ + SELECT + NULL::varchar(10) AS identifier, + NULL::varchar(150) AS legal_name, + NULL::varchar(3) AS legal_type, + NULL::varchar(254) AS admin_email, + NULL::varchar(300) AS pass_code + WHERE 1=0; + """ + + corp_nums_up = [c.strip().upper() for c in corp_nums if c and c.strip()] + corp_nums_sql = _quote_list(corp_nums_up) + suffix_sql = _escape_sql_literal(suffix or "") + + return f""" + SELECT DISTINCT ON (c.corp_num) + c.corp_num AS identifier, + (COALESCE(NULLIF(trim(cn.corp_name), ''), c.corp_num) || '{suffix_sql}') AS legal_name, + c.corp_type_cd AS legal_type, + c.admin_email AS admin_email, + c.corp_password AS pass_code + FROM corporation c + LEFT JOIN corp_name cn + ON cn.corp_num = c.corp_num + AND cn.end_event_id IS NULL + AND cn.corp_name_typ_cd IN ('CO', 'NB') + WHERE c.corp_num IN ({corp_nums_sql}) + ORDER BY + c.corp_num, + CASE cn.corp_name_typ_cd WHEN 'CO' THEN 0 WHEN 'NB' THEN 1 ELSE 2 END, + cn.start_event_id DESC NULLS LAST; + """ diff --git a/data-tool/flows/auth/auth_tasks.py b/data-tool/flows/auth/auth_tasks.py new file mode 100644 index 0000000000..3592cfd2f8 --- /dev/null +++ b/data-tool/flows/auth/auth_tasks.py @@ -0,0 +1,369 @@ +from __future__ import annotations + +from http import HTTPStatus +from typing import Any, Dict, List + +from prefect import task +from prefect.cache_policies import NO_CACHE + +from common.auth_service import AuthService +from .auth_models import AuthComponentStatus, AuthCreatePlan, AuthDeletePlan + + +def parse_accounts_csv(csv_val: str | None) -> List[int]: + """Parse a CSV list of account IDs into a sorted, de-duped list of ints.""" + if not csv_val: + return [] + out: List[int] = [] + for tok in str(csv_val).split(','): + t = tok.strip() + if t.isdigit(): + out.append(int(t)) + return sorted(set(out)) + + +def parse_corp_nums_csv(csv_val: str | None) -> List[str]: + """Parse a CSV list of corp numbers into a de-duped list (preserving order).""" + if not csv_val: + return [] + parts: List[str] = [] + for tok in str(csv_val).replace('\n', ',').split(','): + t = tok.strip().upper() + if t: + parts.append(t) + seen = set() + out: List[str] = [] + for t in parts: + if t not in seen: + seen.add(t) + out.append(t) + return out + + +def _status_code(val: Any) -> int: + try: + return int(val) + except Exception: + return 0 + + +def _truncate(val: str, max_len: int) -> str: + if val is None: + return '' + if len(val) <= max_len: + return val + return val[: max_len - 3] + '...' + + +@task(cache_policy=NO_CACHE) +def get_auth_token(config) -> str: + """Get one bearer token to reuse for an entire claimed batch.""" + token = AuthService.get_bearer_token(config) + if not token: + raise Exception("Unable to obtain auth token") + return token + + +@task(cache_policy=NO_CACHE) +def perform_auth_create_for_corp( + config, + corp_num: str, + profile: Dict[str, Any], + account_ids: List[int], + plan: AuthCreatePlan, + token: str +) -> Dict[str, Any]: + """ + Execute auth create-like operations for a single corp based on plan. + + Returns a dict suitable for persisting to auth_processing: + - entity_action/contact_action/affiliation_action/invite_action + - action_detail (short) + - error (short, safe) + """ + entity_action = AuthComponentStatus.NOT_RUN + contact_action = AuthComponentStatus.NOT_RUN + affiliation_action = AuthComponentStatus.NOT_RUN + invite_action = AuthComponentStatus.NOT_RUN + + detail_parts: List[str] = [] + errors: List[str] = [] + + try: + identifier = (profile or {}).get('identifier') or corp_num + legal_name = (profile or {}).get('legal_name') or identifier + legal_type = (profile or {}).get('legal_type') or (profile or {}).get('corp_type_cd') or '' + + # NEVER log or persist pass_code (secret). + pass_code = (profile or {}).get('pass_code') or '' + if getattr(config, 'USE_CUSTOM_PASSCODE', False) and getattr(config, 'CUSTOM_PASSCODE', ''): + pass_code = getattr(config, 'CUSTOM_PASSCODE') + + # Determine contact email (safe to store) + email = (profile or {}).get('admin_email') or (profile or {}).get('contact_email') or '' + if getattr(config, 'USE_CUSTOM_CONTACT_EMAIL', False) and getattr(config, 'CUSTOM_CONTACT_EMAIL', ''): + email = getattr(config, 'CUSTOM_CONTACT_EMAIL') + + # Mutual exclusion: affiliations vs invite + if plan.create_affiliations and plan.send_unaffiliated_invite: + affiliation_action = AuthComponentStatus.FAILED + invite_action = AuthComponentStatus.FAILED + errors.append('plan_conflict_affiliations_vs_invite') + detail_parts.append('plan:invalid(affiliations+invite)') + return { + 'corp_num': corp_num, + 'entity_action': entity_action.value, + 'contact_action': contact_action.value, + 'affiliation_action': affiliation_action.value, + 'invite_action': invite_action.value, + 'action_detail': _truncate('; '.join(detail_parts), 2000), + 'error': _truncate('; '.join(errors), 1000) or None + } + + # 1) Create entity + if plan.create_entity: + if plan.dry_run: + entity_action = AuthComponentStatus.SKIPPED + detail_parts.append('entity:DRY_RUN') + else: + status = AuthService.create_entity( + config=config, + business_registration=identifier, + business_name=legal_name, + corp_type_code=legal_type, + pass_code=pass_code, + token=token + ) + code = _status_code(status) + detail_parts.append(f'entity:{code}') + if code == int(HTTPStatus.OK): + entity_action = AuthComponentStatus.SUCCESS + else: + entity_action = AuthComponentStatus.FAILED + errors.append(f'create_entity:{code}') + + # 2) Upsert contact + if plan.upsert_contact: + if not email: + detail_parts.append('contact:missing_email') + if plan.fail_if_missing_email: + contact_action = AuthComponentStatus.FAILED + errors.append('missing_email_for_contact') + else: + contact_action = AuthComponentStatus.SKIPPED + elif plan.dry_run: + contact_action = AuthComponentStatus.SKIPPED + detail_parts.append('contact:DRY_RUN') + else: + status = AuthService.update_contact_email( + config=config, + identifier=identifier, + email=email, + token=token + ) + code = _status_code(status) + detail_parts.append(f'contact:{code}') + if code == int(HTTPStatus.OK): + contact_action = AuthComponentStatus.SUCCESS + else: + contact_action = AuthComponentStatus.FAILED + errors.append(f'upsert_contact:{code}') + + # 3) Create affiliations + if plan.create_affiliations: + if not account_ids: + affiliation_action = AuthComponentStatus.SKIPPED + detail_parts.append('affiliations:no_accounts') + elif plan.dry_run: + affiliation_action = AuthComponentStatus.SKIPPED + detail_parts.append(f'affiliations:DRY_RUN({len(account_ids)})') + else: + ok = 0 + fail = 0 + for acct in sorted(set(account_ids)): + status = AuthService.create_affiliation( + config=config, + account=acct, + business_registration=identifier, + business_name=legal_name, + corp_type_code=legal_type, + pass_code=pass_code, + details={'identifier': identifier}, + token=token + ) + code = _status_code(status) + if code == int(HTTPStatus.OK): + ok += 1 + else: + fail += 1 + detail_parts.append(f'affiliations:ok{ok} fail{fail}') + if fail > 0: + affiliation_action = AuthComponentStatus.FAILED + errors.append(f'create_affiliations_failed:{fail}') + else: + affiliation_action = AuthComponentStatus.SUCCESS + + # 4) Send unaffiliated invite + if plan.send_unaffiliated_invite: + if not email: + detail_parts.append('invite:missing_email') + if plan.fail_if_missing_email: + invite_action = AuthComponentStatus.FAILED + errors.append('missing_email_for_invite') + else: + invite_action = AuthComponentStatus.SKIPPED + elif plan.dry_run: + invite_action = AuthComponentStatus.SKIPPED + detail_parts.append('invite:DRY_RUN') + else: + status = AuthService.send_unaffiliated_email( + config=config, + identifier=identifier, + email=email, + token=token + ) + code = _status_code(status) + detail_parts.append(f'invite:{code}') + if code == int(HTTPStatus.OK): + invite_action = AuthComponentStatus.SUCCESS + else: + invite_action = AuthComponentStatus.FAILED + errors.append(f'send_invite:{code}') + + except Exception as e: + detail_parts.append('exception') + errors.append(_truncate(repr(e), 900)) + if plan.create_entity and entity_action == AuthComponentStatus.NOT_RUN: + entity_action = AuthComponentStatus.FAILED + if plan.upsert_contact and contact_action == AuthComponentStatus.NOT_RUN: + contact_action = AuthComponentStatus.FAILED + if plan.create_affiliations and affiliation_action == AuthComponentStatus.NOT_RUN: + affiliation_action = AuthComponentStatus.FAILED + if plan.send_unaffiliated_invite and invite_action == AuthComponentStatus.NOT_RUN: + invite_action = AuthComponentStatus.FAILED + + action_detail = _truncate('; '.join(detail_parts), 2000) + error = _truncate('; '.join(errors), 1000) if errors else None + + return { + 'corp_num': corp_num, + 'entity_action': entity_action.value, + 'contact_action': contact_action.value, + 'affiliation_action': affiliation_action.value, + 'invite_action': invite_action.value, + 'action_detail': action_detail, + 'error': error + } + + +@task(cache_policy=NO_CACHE) +def perform_auth_delete_for_corp( + config, + corp_num: str, + account_ids: List[int], + plan: AuthDeletePlan, + token: str +) -> Dict[str, Any]: + """ + Execute auth delete-like operations for a single corp based on plan. + + IMPORTANT: Uses explicit delete primitives (no combined delete_affiliation()). + + Returns a dict suitable for persisting to auth_processing. + """ + identifier = corp_num + + entity_action = AuthComponentStatus.NOT_RUN + contact_action = AuthComponentStatus.NOT_RUN + affiliation_action = AuthComponentStatus.NOT_RUN + invite_action = AuthComponentStatus.NOT_RUN + + detail_parts: List[str] = [] + errors: List[str] = [] + + try: + # 1) Delete affiliations (best-effort across accounts) + if plan.delete_affiliations: + if not account_ids: + affiliation_action = AuthComponentStatus.SKIPPED + detail_parts.append('del_affiliations:no_accounts') + elif plan.dry_run: + affiliation_action = AuthComponentStatus.SKIPPED + detail_parts.append(f'del_affiliations:DRY_RUN({len(account_ids)})') + else: + ok = 0 + not_found = 0 + fail = 0 + for acct in sorted(set(account_ids)): + status = AuthService.delete_affiliation_only( + config=config, + account=acct, + identifier=identifier, + token=token + ) + code = _status_code(status) + if code in (int(HTTPStatus.OK), int(HTTPStatus.NO_CONTENT)): + ok += 1 + elif code == int(HTTPStatus.NOT_FOUND): + not_found += 1 + else: + fail += 1 + + detail_parts.append(f'del_affiliations:ok{ok} nf{not_found} fail{fail}') + if fail > 0: + affiliation_action = AuthComponentStatus.FAILED + errors.append(f'delete_affiliations_failed:{fail}') + elif ok > 0: + affiliation_action = AuthComponentStatus.SUCCESS + else: + affiliation_action = AuthComponentStatus.SKIPPED + + # 2) Delete invites (NOT supported by current client/API) + if plan.delete_invites: + invite_action = AuthComponentStatus.FAILED + detail_parts.append('del_invites:UNSUPPORTED') + errors.append('delete_invites_not_supported') + + # 3) Delete entity (after affiliations) + if plan.delete_entity: + if plan.dry_run: + entity_action = AuthComponentStatus.SKIPPED + detail_parts.append('del_entity:DRY_RUN') + else: + status = AuthService.delete_entity( + config=config, + identifier=identifier, + token=token + ) + code = _status_code(status) + detail_parts.append(f'del_entity:{code}') + if code in (int(HTTPStatus.OK), int(HTTPStatus.NO_CONTENT)): + entity_action = AuthComponentStatus.SUCCESS + elif code == int(HTTPStatus.NOT_FOUND): + entity_action = AuthComponentStatus.SKIPPED + else: + entity_action = AuthComponentStatus.FAILED + errors.append(f'delete_entity:{code}') + + except Exception as e: + detail_parts.append('exception') + errors.append(_truncate(repr(e), 900)) + if plan.delete_affiliations and affiliation_action == AuthComponentStatus.NOT_RUN: + affiliation_action = AuthComponentStatus.FAILED + if plan.delete_invites and invite_action == AuthComponentStatus.NOT_RUN: + invite_action = AuthComponentStatus.FAILED + if plan.delete_entity and entity_action == AuthComponentStatus.NOT_RUN: + entity_action = AuthComponentStatus.FAILED + + action_detail = _truncate('; '.join(detail_parts), 2000) + error = _truncate('; '.join(errors), 1000) if errors else None + + return { + 'corp_num': corp_num, + 'entity_action': entity_action.value, + 'contact_action': contact_action.value, + 'affiliation_action': affiliation_action.value, + 'invite_action': invite_action.value, + 'action_detail': action_detail, + 'error': error + } diff --git a/data-tool/flows/common/auth_service.py b/data-tool/flows/common/auth_service.py index f7a468f7e5..4480982541 100644 --- a/data-tool/flows/common/auth_service.py +++ b/data-tool/flows/common/auth_service.py @@ -1,26 +1,23 @@ import json from http import HTTPStatus -from typing import Dict +from typing import Optional import requests class AuthService: - """Wrapper to call Authentication Services. - """ + """Wrapper to call Authentication Services.""" BEARER: str = 'Bearer ' CONTENT_TYPE_JSON = {'Content-Type': 'application/json'} @classmethod - def get_time_out(cls, config): - try: - timeout = int(config.ACCOUNT_SVC_TIMEOUT, 20) - except Exception: - timeout = 20 + def get_time_out(cls, config) -> int: + """Return request timeout (seconds).""" + return getattr(config, 'ACCOUNT_SVC_TIMEOUT', None) or 20 @classmethod - def get_bearer_token(cls, config): + def get_bearer_token(cls, config) -> Optional[str]: """Get a valid Bearer token for the service to use.""" token_url = config.ACCOUNT_SVC_AUTH_URL client_id = config.ACCOUNT_SVC_CLIENT_ID @@ -29,36 +26,45 @@ def get_bearer_token(cls, config): data = 'grant_type=client_credentials' # get service account token - res = requests.post(url=token_url, - data=data, - headers={'content-type': 'application/x-www-form-urlencoded'}, - auth=(client_id, client_secret), - timeout=cls.get_time_out(config)) + res = requests.post( + url=token_url, + data=data, + headers={'content-type': 'application/x-www-form-urlencoded'}, + auth=(client_id, client_secret), + timeout=cls.get_time_out(config) + ) try: return res.json().get('access_token') except Exception: return None + @classmethod + def _resolve_token(cls, config, token: str | None) -> Optional[str]: + """Use provided token, or fetch a new one.""" + return token or cls.get_bearer_token(config) # pylint: disable=too-many-arguments, too-many-locals disable=invalid-name; @classmethod - def create_affiliation(cls, - config, - account: int, - business_registration: str, - business_name: str = None, - corp_type_code: str = 'TMP', - corp_sub_type_code: str = None, - pass_code: str = '', - details: dict = None): + def create_affiliation( + cls, + config, + account: int, + business_registration: str, + business_name: str = None, + corp_type_code: str = 'TMP', + corp_sub_type_code: str = None, + pass_code: str = '', + details: dict = None, + *, + token: str | None = None + ) -> HTTPStatus: """Affiliate a business to an account.""" auth_url = config.AUTH_SVC_URL account_svc_entity_url = f'{auth_url}/entities' account_svc_affiliate_url = f'{auth_url}/orgs/{account}/affiliations' - token = cls.get_bearer_token(config) - + token = cls._resolve_token(config, token) if not token: return HTTPStatus.UNAUTHORIZED @@ -90,6 +96,7 @@ def create_affiliation(cls, affiliate_data['passCode'] = pass_code if details: affiliate_data['entityDetails'] = details + affiliate = requests.post( url=account_svc_affiliate_url, headers={**cls.CONTENT_TYPE_JSON, @@ -98,30 +105,48 @@ def create_affiliation(cls, timeout=cls.get_time_out(config) ) - if ( - affiliate.status_code != HTTPStatus.CREATED - or entity_record.status_code not in (HTTPStatus.ACCEPTED, HTTPStatus.CREATED) - ): + # Be tolerant of idempotent responses (existing entity/affiliation). + entity_ok = entity_record.status_code in ( + HTTPStatus.ACCEPTED, + HTTPStatus.CREATED, + HTTPStatus.OK, + HTTPStatus.CONFLICT + ) + if entity_record.status_code == HTTPStatus.BAD_REQUEST and 'DATA_ALREADY_EXISTS' in entity_record.text: + entity_ok = True + + affiliate_ok = affiliate.status_code in ( + HTTPStatus.CREATED, + HTTPStatus.OK, + HTTPStatus.CONFLICT + ) + if affiliate.status_code == HTTPStatus.BAD_REQUEST and 'DATA_ALREADY_EXISTS' in affiliate.text: + affiliate_ok = True + + if not (entity_ok and affiliate_ok): return HTTPStatus.BAD_REQUEST + return HTTPStatus.OK @classmethod - def create_entity(cls, - config, - business_registration: str, - business_name: str, - corp_type_code: str, - pass_code: str = ''): - """Update an entity.""" + def create_entity( + cls, + config, + business_registration: str, + business_name: str, + corp_type_code: str, + pass_code: str = '', + *, + token: str | None = None + ) -> HTTPStatus: + """Create an entity.""" auth_url = config.AUTH_SVC_URL account_svc_entity_url = f'{auth_url}/entities' - token = cls.get_bearer_token(config) - + token = cls._resolve_token(config, token) if not token: return HTTPStatus.UNAUTHORIZED - # Create an entity record entity_data = { 'businessIdentifier': business_registration, 'corpTypeCode': corp_type_code, @@ -139,28 +164,31 @@ def create_entity(cls, timeout=cls.get_time_out(config) ) - if entity_record.status_code not in (HTTPStatus.ACCEPTED, HTTPStatus.CREATED): - return HTTPStatus.BAD_REQUEST - return HTTPStatus.OK - + if entity_record.status_code in (HTTPStatus.ACCEPTED, HTTPStatus.CREATED, HTTPStatus.OK, HTTPStatus.CONFLICT): + return HTTPStatus.OK + if entity_record.status_code == HTTPStatus.BAD_REQUEST and 'DATA_ALREADY_EXISTS' in entity_record.text: + return HTTPStatus.OK + return HTTPStatus.BAD_REQUEST @classmethod - def update_entity(cls, - config, - business_registration: str, - business_name: str, - corp_type_code: str, - state: str = None): + def update_entity( + cls, + config, + business_registration: str, + business_name: str, + corp_type_code: str, + state: str = None, + *, + token: str | None = None + ) -> HTTPStatus: """Update an entity.""" auth_url = config.AUTH_SVC_URL account_svc_entity_url = f'{auth_url}/entities' - token = cls.get_bearer_token(config) - + token = cls._resolve_token(config, token) if not token: return HTTPStatus.UNAUTHORIZED - # Create an entity record entity_data = { 'businessIdentifier': business_registration, 'corpTypeCode': corp_type_code, @@ -182,24 +210,22 @@ def update_entity(cls, return HTTPStatus.OK @classmethod - def delete_affiliation(cls, config, account: int, business_registration: str) -> Dict: - """Affiliate a business to an account. - - """ + def delete_affiliation(cls, config, account: int, business_registration: str, *, token: str | None = None) -> HTTPStatus: + """Legacy combined delete: deletes BOTH affiliation and entity (avoid in delete flows).""" auth_url = config.AUTH_SVC_URL account_svc_entity_url = f'{auth_url}/entities' account_svc_affiliate_url = f'{auth_url}/orgs/{account}/affiliations' - token = cls.get_bearer_token(config) + token = cls._resolve_token(config, token) + if not token: + return HTTPStatus.UNAUTHORIZED - # Delete an account:business affiliation affiliate = requests.delete( url=account_svc_affiliate_url + '/' + business_registration, headers={**cls.CONTENT_TYPE_JSON, 'Authorization': cls.BEARER + token}, timeout=cls.get_time_out(config) ) - # Delete an entity record entity_record = requests.delete( url=account_svc_entity_url + '/' + business_registration, headers={**cls.CONTENT_TYPE_JSON, @@ -207,19 +233,65 @@ def delete_affiliation(cls, config, account: int, business_registration: str) -> timeout=cls.get_time_out(config) ) - if affiliate.status_code != HTTPStatus.OK \ - or entity_record.status_code not in (HTTPStatus.OK, HTTPStatus.NO_CONTENT): + affiliate_ok = affiliate.status_code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND) + entity_ok = entity_record.status_code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND) + + if not (affiliate_ok and entity_ok): return HTTPStatus.BAD_REQUEST return HTTPStatus.OK @classmethod - def update_contact_email(cls, config, identifier: str, email: str) -> Dict: + def delete_entity(cls, config, identifier: str, *, token: str | None = None) -> HTTPStatus: + """Delete an entity ONLY.""" + auth_url = config.AUTH_SVC_URL + account_svc_entity_url = f'{auth_url}/entities' + + token = cls._resolve_token(config, token) + if not token: + return HTTPStatus.UNAUTHORIZED + + rv = requests.delete( + url=f'{account_svc_entity_url}/{identifier}', + headers={**cls.CONTENT_TYPE_JSON, + 'Authorization': cls.BEARER + token}, + timeout=cls.get_time_out(config) + ) + try: + return HTTPStatus(rv.status_code) + except Exception: + return HTTPStatus.BAD_REQUEST + + @classmethod + def delete_affiliation_only(cls, config, account: int, identifier: str, *, token: str | None = None) -> HTTPStatus: + """Delete an affiliation ONLY (does not delete the entity).""" + auth_url = config.AUTH_SVC_URL + account_svc_affiliate_url = f'{auth_url}/orgs/{account}/affiliations' + + token = cls._resolve_token(config, token) + if not token: + return HTTPStatus.UNAUTHORIZED + + rv = requests.delete( + url=f'{account_svc_affiliate_url}/{identifier}', + headers={**cls.CONTENT_TYPE_JSON, + 'Authorization': cls.BEARER + token}, + timeout=cls.get_time_out(config) + ) + try: + return HTTPStatus(rv.status_code) + except Exception: + return HTTPStatus.BAD_REQUEST + + @classmethod + def update_contact_email(cls, config, identifier: str, email: str, *, token: str | None = None) -> HTTPStatus: """Update contact email of the business.""" - token = cls.get_bearer_token(config) + token = cls._resolve_token(config, token) + if not token: + return HTTPStatus.UNAUTHORIZED + auth_url = config.AUTH_SVC_URL account_svc_entity_url = f'{auth_url}/entities' - # Create an entity record data = { 'email': email, 'phone': '', @@ -247,18 +319,25 @@ def update_contact_email(cls, config, identifier: str, email: str) -> Dict: timeout=cls.get_time_out(config) ) - if rv.status_code in (HTTPStatus.OK, HTTPStatus.CREATED): + if rv.status_code in (HTTPStatus.OK, HTTPStatus.CREATED, HTTPStatus.ACCEPTED): return HTTPStatus.OK - return rv.status_code + try: + return HTTPStatus(rv.status_code) + except Exception: + return HTTPStatus.BAD_REQUEST @classmethod - def send_unaffiliated_email(cls, config, identifier: str, email: str) -> Dict: - """Send unaffiliated email to the business.""" - token = cls.get_bearer_token(config) + def send_unaffiliated_email(cls, config, identifier: str, email: str, *, token: str | None = None) -> HTTPStatus: + """Send unaffiliated email/invite to the business (if supported by API).""" + token = cls._resolve_token(config, token) + if not token: + return HTTPStatus.UNAUTHORIZED + auth_url = config.AUTH_SVC_URL account_svc_affiliation_invitation_url = f'{auth_url}/affiliationInvitations' print(f'👷 Sending unaffiliated email to {email} for {identifier}...') + data = {} rv = requests.post( url=f'{account_svc_affiliation_invitation_url}/unaffiliated/{identifier}', @@ -269,8 +348,11 @@ def send_unaffiliated_email(cls, config, identifier: str, email: str) -> Dict: data=json.dumps(data), timeout=cls.get_time_out(config) ) - + if rv.status_code in (HTTPStatus.OK, HTTPStatus.CREATED): return HTTPStatus.OK - return rv.status_code + try: + return HTTPStatus(rv.status_code) + except Exception: + return HTTPStatus.BAD_REQUEST diff --git a/data-tool/flows/config.py b/data-tool/flows/config.py index e90243e433..4f0a6fdf46 100644 --- a/data-tool/flows/config.py +++ b/data-tool/flows/config.py @@ -29,6 +29,20 @@ load_dotenv(find_dotenv()) +def _get_int(name: str, default: int = 0) -> int: + """Safe int env parsing that avoids None.isnumeric() crashes.""" + val = os.getenv(name) + return int(val) if (val and val.isnumeric()) else default + + +def _get_bool(name: str, default: bool = False) -> bool: + """Safe bool env parsing (case-insensitive).""" + val = os.getenv(name) + if val is None: + return default + return val.strip().lower() == 'true' + + def get_named_config(config_name: str = 'production'): """Return the configuration object based on the name. @@ -134,23 +148,20 @@ class _Config(): # pylint: disable=too-few-public-methods ACCOUNT_SVC_AFFILIATE_URL = os.getenv('ACCOUNT_SVC_AFFILIATE_URL') ACCOUNT_SVC_CLIENT_ID = os.getenv('ACCOUNT_SVC_CLIENT_ID') ACCOUNT_SVC_CLIENT_SECRET = os.getenv('ACCOUNT_SVC_CLIENT_SECRET') - ACCOUNT_SVC_TIMEOUT = os.getenv('ACCOUNT_SVC_TIMEOUT') - ACCOUNT_SVC_TIMEOUT = int(ACCOUNT_SVC_TIMEOUT) if ACCOUNT_SVC_TIMEOUT.isnumeric() else None + ACCOUNT_SVC_TIMEOUT = _get_int('ACCOUNT_SVC_TIMEOUT', 0) + ACCOUNT_SVC_TIMEOUT = int(ACCOUNT_SVC_TIMEOUT) if ACCOUNT_SVC_TIMEOUT > 0 else None # batch delete flow - DELETE_BATCHES = os.getenv('DELETE_BATCHES') - DELETE_BATCHES = int(DELETE_BATCHES) if DELETE_BATCHES.isnumeric() else 0 - DELETE_BATCH_SIZE = os.getenv('DELETE_BATCH_SIZE') - DELETE_BATCH_SIZE = int(DELETE_BATCH_SIZE) if DELETE_BATCH_SIZE.isnumeric() else 0 + DELETE_BATCHES = _get_int('DELETE_BATCHES', 0) + DELETE_BATCH_SIZE = _get_int('DELETE_BATCH_SIZE', 0) - DELETE_AUTH_RECORDS = os.getenv('DELETE_AUTH_RECORDS').lower() == 'true' - DELETE_CORP_PROCESSING_RECORDS = os.getenv('DELETE_CORP_PROCESSING_RECORDS').lower() == 'true' + # Fix footgun: env vars may be unset + DELETE_AUTH_RECORDS = _get_bool('DELETE_AUTH_RECORDS', False) + DELETE_CORP_PROCESSING_RECORDS = _get_bool('DELETE_CORP_PROCESSING_RECORDS', False) # tombstone flow - TOMBSTONE_BATCHES = os.getenv('TOMBSTONE_BATCHES') - TOMBSTONE_BATCHES = int(TOMBSTONE_BATCHES) if TOMBSTONE_BATCHES.isnumeric() else 0 - TOMBSTONE_BATCH_SIZE = os.getenv('TOMBSTONE_BATCH_SIZE') - TOMBSTONE_BATCH_SIZE = int(TOMBSTONE_BATCH_SIZE) if TOMBSTONE_BATCH_SIZE.isnumeric() else 0 + TOMBSTONE_BATCHES = _get_int('TOMBSTONE_BATCHES', 0) + TOMBSTONE_BATCH_SIZE = _get_int('TOMBSTONE_BATCH_SIZE', 0) # reservation (reserve_for_flow) query statement timeout (Postgres statement_timeout, in ms). # When set, long-running reservation queries fail fast instead of tying up a worker indefinitely. @@ -158,17 +169,14 @@ class _Config(): # pylint: disable=too-few-public-methods RESERVE_STATEMENT_TIMEOUT_MS = int(RESERVE_STATEMENT_TIMEOUT_MS) if RESERVE_STATEMENT_TIMEOUT_MS.isnumeric() else None # verify flow - VERIFY_BATCHES = os.getenv('VERIFY_BATCHES') - VERIFY_BATCHES = int(VERIFY_BATCHES) if VERIFY_BATCHES.isnumeric() else 0 - VERIFY_BATCH_SIZE = os.getenv('VERIFY_BATCH_SIZE') - VERIFY_BATCH_SIZE = int(VERIFY_BATCH_SIZE) if VERIFY_BATCH_SIZE.isnumeric() else 0 + VERIFY_BATCHES = _get_int('VERIFY_BATCHES', 0) + VERIFY_BATCH_SIZE = _get_int('VERIFY_BATCH_SIZE', 0) VERIFY_SUMMARY_PATH = os.getenv('VERIFY_SUMMARY_PATH') # freeze flow - FREEZE_BATCHES = os.getenv('FREEZE_BATCHES') - FREEZE_BATCHES = int(FREEZE_BATCHES) if FREEZE_BATCHES.isnumeric() else 0 - FREEZE_BATCH_SIZE = os.getenv('FREEZE_BATCH_SIZE') - FREEZE_BATCH_SIZE = int(FREEZE_BATCH_SIZE) if FREEZE_BATCH_SIZE.isnumeric() else 0 + FREEZE_BATCHES = _get_int('FREEZE_BATCHES', 0) + FREEZE_BATCH_SIZE = _get_int('FREEZE_BATCH_SIZE', 0) + # ORACLE COLIN DB DB_USER_COLIN_ORACLE = os.getenv('DATABASE_USERNAME_COLIN_ORACLE', '') DB_PASSWORD_COLIN_ORACLE = os.getenv('DATABASE_PASSWORD_COLIN_ORACLE', '') @@ -189,6 +197,34 @@ class _Config(): # pylint: disable=too-few-public-methods MIG_GROUP_IDS = os.getenv('MIG_GROUP_IDS') MIG_BATCH_IDS = os.getenv('MIG_BATCH_IDS') + # ------------------------------------------------------------------------------------------ + # Auth-only flows (auth_processing tracking) + # ------------------------------------------------------------------------------------------ + # Selection + AUTH_SELECTION_MODE = os.getenv('AUTH_SELECTION_MODE', 'MIGRATION_FILTER') + AUTH_CORP_NUMS = os.getenv('AUTH_CORP_NUMS', '') + AUTH_SOURCE_FLOW_NAME = os.getenv('AUTH_SOURCE_FLOW_NAME', 'tombstone-flow') + + # Throughput + AUTH_BATCHES = _get_int('AUTH_BATCHES', 0) + AUTH_BATCH_SIZE = _get_int('AUTH_BATCH_SIZE', 0) + AUTH_MAX_WORKERS = _get_int('AUTH_MAX_WORKERS', 50) or 50 + + # Create plan + AUTH_CREATE_ENTITY = _get_bool('AUTH_CREATE_ENTITY', True) + AUTH_UPSERT_CONTACT = _get_bool('AUTH_UPSERT_CONTACT', False) + AUTH_CREATE_AFFILIATIONS = _get_bool('AUTH_CREATE_AFFILIATIONS', False) + AUTH_SEND_UNAFFILIATED_EMAIL = _get_bool('AUTH_SEND_UNAFFILIATED_EMAIL', False) + AUTH_FAIL_IF_MISSING_EMAIL = _get_bool('AUTH_FAIL_IF_MISSING_EMAIL', False) + AUTH_DRY_RUN = _get_bool('AUTH_DRY_RUN', False) + + # Delete plan + AUTH_DELETE_AFFILIATIONS = _get_bool('AUTH_DELETE_AFFILIATIONS', False) + AUTH_DELETE_ENTITY = _get_bool('AUTH_DELETE_ENTITY', False) + AUTH_DELETE_INVITES = _get_bool('AUTH_DELETE_INVITES', False) # unsupported unless API confirmed + AUTH_REQUIRE_CONFIRMATION = _get_bool('AUTH_REQUIRE_CONFIRMATION', False) + AUTH_CONFIRMATION_TOKEN = os.getenv('AUTH_CONFIRMATION_TOKEN', '') + TESTING = False DEBUG = False @@ -210,7 +246,6 @@ class TestConfig(_Config): # pylint: disable=too-few-public-methods TESTING = True - class ProdConfig(_Config): # pylint: disable=too-few-public-methods """Production environment configuration.""" diff --git a/data-tool/scripts/colin_corps_extract_postgres_ddl b/data-tool/scripts/colin_corps_extract_postgres_ddl index 31f5412d74..512ef219db 100644 --- a/data-tool/scripts/colin_corps_extract_postgres_ddl +++ b/data-tool/scripts/colin_corps_extract_postgres_ddl @@ -2,6 +2,10 @@ create sequence affiliation_processing_id_seq; alter sequence affiliation_processing_id_seq owner to postgres; +create sequence if not exists auth_processing_id_seq; + +alter sequence auth_processing_id_seq owner to postgres; + create sequence corp_processing_id_seq; alter sequence corp_processing_id_seq owner to postgres; @@ -792,6 +796,51 @@ create index if not exists idx_corp_processing_last_processed_event_id create index if not exists idx_corp_processing_flow_name on corp_processing (flow_name); +create table if not exists auth_processing +( + id integer default nextval('auth_processing_id_seq'::regclass) not null + constraint pk_auth_processing primary key, + + corp_num varchar(10) not null + constraint fk_auth_processing_corporation references corporation (corp_num), + corp_type_cd varchar(3), + mig_batch_id integer + constraint fk_auth_processing_batch references mig_batch, + + flow_name varchar(100) not null, + environment varchar(25) not null, + flow_run_id uuid, + processed_status varchar(25) not null, + + create_date timestamp with time zone default current_timestamp not null, + last_modified timestamp with time zone default current_timestamp not null, + claimed_at timestamp with time zone, + last_error varchar(1000), + + -- safe inputs (no secrets) + account_ids varchar(100), + contact_email varchar(254), + + -- outcomes per component (queryable) + entity_action varchar(25), + contact_action varchar(25), + affiliation_action varchar(25), + invite_action varchar(25), + + -- short structured summary, never secrets + action_detail varchar(2000), + + constraint unq_auth_processing unique (corp_num, flow_name, environment) +); + +alter table auth_processing owner to postgres; + +create index if not exists idx_auth_processing_claim_batch + on auth_processing (environment, flow_name, flow_run_id, processed_status, claimed_at); + +create index if not exists idx_auth_processing_flow_env_status + on auth_processing (flow_name, environment, processed_status, corp_num); + create table if not exists corp_restriction ( corp_num varchar(10) not null