From d1d58c80fe77bf95c4f3185af2f7d75acbc4c8c7 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 18 Feb 2025 14:01:58 +0330 Subject: [PATCH] feat: enhance community fetching with platform filtering and update workflow timeout --- hivemind_etl/activities.py | 24 +++++++- hivemind_etl/website/module.py | 11 ++++ test_run_workflow.py | 3 +- tests/integration/test_module_website.py | 74 ++++++++++++++++++++++++ workflows.py | 7 ++- 5 files changed, 112 insertions(+), 7 deletions(-) diff --git a/hivemind_etl/activities.py b/hivemind_etl/activities.py index b773600..a8edaef 100644 --- a/hivemind_etl/activities.py +++ b/hivemind_etl/activities.py @@ -13,10 +13,28 @@ @activity.defn -async def get_communities() -> list[dict[str, Any]]: - """Fetch all communities that need to be processed.""" +async def get_communities(platform_id: str | None = None) -> list[dict[str, Any]]: + """ + Fetch all communities that need to be processed in case of no platform id given + Else, just process for one platform + + Parameters + ----------- + platform_id : str | None + A platform's community to be fetched + for default it is as `None` meaning to get all communities information + + Returns + --------- + communities : list[dict[str, Any]] + a list of communities holding website informations + """ try: - communities = ModulesWebsite().get_learning_platforms() + if platform_id: + logger.info("Website ingestion is filtered for a single community!") + communities = ModulesWebsite().get_learning_platforms( + filter_platform_id=platform_id + ) logger.info(f"Found {len(communities)} communities to process") return communities except Exception as e: diff --git a/hivemind_etl/website/module.py b/hivemind_etl/website/module.py index 54684d0..bb0f59d 100644 --- a/hivemind_etl/website/module.py +++ b/hivemind_etl/website/module.py @@ -10,10 +10,17 @@ def __init__(self) -> None: def get_learning_platforms( self, + filter_platform_id: str | None = None, ) -> list[dict[str, str | list[str]]]: """ Get all the website communities with their page titles. + Parameters + ----------- + filter_platform_id : str | None + A platform's community to be fetched + for default it is as `None` meaning to get all communities information + Returns --------- community_orgs : list[dict[str, str | list[str]]] = [] @@ -41,6 +48,10 @@ def get_learning_platforms( platform_id = platform["platform"] + # if we needed to get specific platforms + if filter_platform_id and filter_platform_id != str(platform_id): + continue + try: website_links = self.get_platform_metadata( platform_id=platform_id, diff --git a/test_run_workflow.py b/test_run_workflow.py index 0cb4e73..c0763cb 100644 --- a/test_run_workflow.py +++ b/test_run_workflow.py @@ -45,9 +45,10 @@ async def start_workflow(): # id="schedules-say-hello", id="schedules-website-ingestion", task_queue=task_queue, + args=["platform_id"], ), spec=ScheduleSpec( - intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))] + intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))] ), state=ScheduleState(note="Here's a note on my Schedule."), ), diff --git a/tests/integration/test_module_website.py b/tests/integration/test_module_website.py index bcce915..206dee8 100644 --- a/tests/integration/test_module_website.py +++ b/tests/integration/test_module_website.py @@ -128,3 +128,77 @@ def test_get_website_communities_data_module_multiple_platforms(self): "urls": ["link1", "link2"], }, ) + + def test_get_website_communities_data_module_multiple_platforms_filtered_one(self): + """ + Test get_learning_platforms when a community has multiple platforms but with a platform filter applied + Verifies that only website platform data is returned even when + other platform types exist. + """ + platform_id = ObjectId("6579c364f1120850414e0dc6") + platform_id2 = ObjectId("6579c364f1120850414e0dc7") + community_id = ObjectId("6579c364f1120850414e0dc5") + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id, + "name": "website", + "metadata": {"resources": ["link1", "link2"]}, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id2, + "name": "website", + "metadata": {"resources": ["link3", "link4"]}, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + self.client["Core"]["modules"].insert_one( + { + "name": "hivemind", + "community": community_id, + "options": { + "platforms": [ + { + "platform": platform_id, + "name": "website", + "metadata": {}, + }, + { + "platform": platform_id2, + "name": "website", + "metadata": {}, + }, + ] + }, + } + ) + + result = self.modules_website.get_learning_platforms( + filter_platform_id=str(platform_id) + ) + + # Assertions + self.assertIsInstance(result, list) + self.assertEqual(len(result), 1) + + self.assertEqual( + result[0], + { + "community_id": "6579c364f1120850414e0dc5", + "platform_id": str(platform_id), + "urls": ["link1", "link2"], + }, + ) diff --git a/workflows.py b/workflows.py index e090525..93896e5 100644 --- a/workflows.py +++ b/workflows.py @@ -69,13 +69,14 @@ async def run(self, community_info: dict) -> None: @workflow.defn class WebsiteIngestionSchedulerWorkflow: @workflow.run - async def run(self) -> None: + async def run(self, platform_id: str | None = None) -> None: # Get all communities communities = await workflow.execute_activity( get_communities, - start_to_close_timeout=timedelta(minutes=5), + platform_id, + start_to_close_timeout=timedelta(minutes=20), retry_policy=RetryPolicy( - maximum_attempts=1, + maximum_attempts=3, ), )