From 612e73a0fe4bd8c384402641451148de8a8315d0 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 2 Apr 2025 09:29:23 +0330 Subject: [PATCH 1/5] fix: re-instantiating the crawler per each route! it seems it was doing some caching (or limiting the urls to 20) even in case more urls were requested to be crawled. now we're re-instantiating the crawler client which it will crawl max 20 urls per each given route. --- hivemind_etl/website/website_etl.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hivemind_etl/website/website_etl.py b/hivemind_etl/website/website_etl.py index 4d0f113..eeaab71 100644 --- a/hivemind_etl/website/website_etl.py +++ b/hivemind_etl/website/website_etl.py @@ -24,7 +24,7 @@ def __init__( collection_name = "website" # preparing the data extractor and ingestion pipelines - self.crawlee_client = CrawleeClient() + # self.crawlee_client = CrawleeClient() self.ingestion_pipeline = CustomIngestionPipeline( self.community_id, collection_name=collection_name ) @@ -51,9 +51,10 @@ async def extract( extracted_data = [] for url in urls: + crawlee_client = CrawleeClient() logging.info(f"Crawling {url} and its routes!") - data = await self.crawlee_client.crawl(links=[url]) - logging.info(f"{len(data)} data is extracted.") + data = await crawlee_client.crawl(links=[url]) + logging.info(f"{len(data)} data is extracted for route: {url}") extracted_data.extend(data) logging.info(f"Extracted {len(extracted_data)} documents!") From 099e257e74bdfa9b0bd3ac556fb7cf00fc5a7d7d Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 2 Apr 2025 10:02:02 +0330 Subject: [PATCH 2/5] fix: skipping the test case for now! --- tests/unit/test_website_etl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_website_etl.py b/tests/unit/test_website_etl.py index e1a6e92..d423532 100644 --- a/tests/unit/test_website_etl.py +++ b/tests/unit/test_website_etl.py @@ -1,11 +1,11 @@ from unittest import IsolatedAsyncioTestCase from unittest.mock import AsyncMock, MagicMock +import pytest from dotenv import load_dotenv from hivemind_etl.website.website_etl import WebsiteETL from llama_index.core import Document - class TestWebsiteETL(IsolatedAsyncioTestCase): def setUp(self): """ @@ -17,6 +17,7 @@ def setUp(self): self.website_etl.crawlee_client = AsyncMock() self.website_etl.ingestion_pipeline = MagicMock() + @pytest.mark.skip() async def test_extract(self): """ Test the extract method. From 71e60f0b34da6f52f2ec0150d8628e2fbad35d40 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 2 Apr 2025 10:02:38 +0330 Subject: [PATCH 3/5] feat: increase activity run time limit & update workflow id! --- hivemind_etl/website/website_etl.py | 7 +++---- workflows.py | 14 +++++++------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/hivemind_etl/website/website_etl.py b/hivemind_etl/website/website_etl.py index eeaab71..afc3e34 100644 --- a/hivemind_etl/website/website_etl.py +++ b/hivemind_etl/website/website_etl.py @@ -23,8 +23,7 @@ def __init__( self.community_id = community_id collection_name = "website" - # preparing the data extractor and ingestion pipelines - # self.crawlee_client = CrawleeClient() + # preparing the ingestion pipeline self.ingestion_pipeline = CustomIngestionPipeline( self.community_id, collection_name=collection_name ) @@ -51,9 +50,9 @@ async def extract( extracted_data = [] for url in urls: - crawlee_client = CrawleeClient() + self.crawlee_client = CrawleeClient() logging.info(f"Crawling {url} and its routes!") - data = await crawlee_client.crawl(links=[url]) + data = await self.crawlee_client.crawl(links=[url]) logging.info(f"{len(data)} data is extracted for route: {url}") extracted_data.extend(data) diff --git a/workflows.py b/workflows.py index 93896e5..df14af7 100644 --- a/workflows.py +++ b/workflows.py @@ -34,18 +34,18 @@ async def run(self, community_info: dict) -> None: raw_data = await workflow.execute_activity( extract_website, args=[urls, community_id], - start_to_close_timeout=timedelta(minutes=10), + start_to_close_timeout=timedelta(minutes=30), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=10), maximum_interval=timedelta(minutes=5), - maximum_attempts=1, + maximum_attempts=3, ), ) documents = await workflow.execute_activity( transform_data, args=[raw_data, community_id], - start_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=10), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=5), maximum_interval=timedelta(minutes=2), @@ -56,11 +56,11 @@ async def run(self, community_info: dict) -> None: await workflow.execute_activity( load_data, args=[documents, community_id], - start_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=60), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=5), maximum_interval=timedelta(minutes=2), - maximum_attempts=1, + maximum_attempts=3, ), ) @@ -74,7 +74,7 @@ async def run(self, platform_id: str | None = None) -> None: communities = await workflow.execute_activity( get_communities, platform_id, - start_to_close_timeout=timedelta(minutes=20), + start_to_close_timeout=timedelta(minutes=5), retry_policy=RetryPolicy( maximum_attempts=3, ), @@ -86,7 +86,7 @@ async def run(self, platform_id: str | None = None) -> None: child_handle = await workflow.start_child_workflow( CommunityWebsiteWorkflow.run, args=[community], - id=f"website-ingest-{community['community_id']}-{workflow.now().strftime('%Y%m%d%H%M')}", + id=f"website:ingestor:{community['community_id']}", retry_policy=RetryPolicy( maximum_attempts=1, ), From eafeb6c10af1eb77bc0220e856b5005da8a621c3 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 2 Apr 2025 10:03:06 +0330 Subject: [PATCH 4/5] fix: black linter issue! --- tests/unit/test_website_etl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/test_website_etl.py b/tests/unit/test_website_etl.py index d423532..e9e6c63 100644 --- a/tests/unit/test_website_etl.py +++ b/tests/unit/test_website_etl.py @@ -6,6 +6,7 @@ from hivemind_etl.website.website_etl import WebsiteETL from llama_index.core import Document + class TestWebsiteETL(IsolatedAsyncioTestCase): def setUp(self): """ From 06f0ebb98966608e6839bd0301b7f1f92101e355 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 2 Apr 2025 10:08:17 +0330 Subject: [PATCH 5/5] feat: getting back the skipped test! --- tests/unit/test_website_etl.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_website_etl.py b/tests/unit/test_website_etl.py index e9e6c63..825570d 100644 --- a/tests/unit/test_website_etl.py +++ b/tests/unit/test_website_etl.py @@ -1,5 +1,5 @@ from unittest import IsolatedAsyncioTestCase -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest from dotenv import load_dotenv @@ -18,7 +18,6 @@ def setUp(self): self.website_etl.crawlee_client = AsyncMock() self.website_etl.ingestion_pipeline = MagicMock() - @pytest.mark.skip() async def test_extract(self): """ Test the extract method. @@ -31,12 +30,18 @@ async def test_extract(self): "title": "Example", } ] - self.website_etl.crawlee_client.crawl.return_value = mocked_data - - extracted_data = await self.website_etl.extract(urls) - - self.assertEqual(extracted_data, mocked_data) - self.website_etl.crawlee_client.crawl.assert_awaited_once_with(links=urls) + + # Mock the CrawleeClient class instead of the instance + with patch('hivemind_etl.website.website_etl.CrawleeClient') as MockCrawleeClient: + mock_client_instance = AsyncMock() + mock_client_instance.crawl.return_value = mocked_data + MockCrawleeClient.return_value = mock_client_instance + + extracted_data = await self.website_etl.extract(urls) + + self.assertEqual(extracted_data, mocked_data) + MockCrawleeClient.assert_called_once() + mock_client_instance.crawl.assert_awaited_once_with(links=urls) def test_transform(self): """