diff --git a/hivemind_etl/website/website_etl.py b/hivemind_etl/website/website_etl.py index 4d0f113..afc3e34 100644 --- a/hivemind_etl/website/website_etl.py +++ b/hivemind_etl/website/website_etl.py @@ -23,8 +23,7 @@ def __init__( self.community_id = community_id collection_name = "website" - # preparing the data extractor and ingestion pipelines - self.crawlee_client = CrawleeClient() + # preparing the ingestion pipeline self.ingestion_pipeline = CustomIngestionPipeline( self.community_id, collection_name=collection_name ) @@ -51,9 +50,10 @@ async def extract( extracted_data = [] for url in urls: + self.crawlee_client = CrawleeClient() logging.info(f"Crawling {url} and its routes!") data = await self.crawlee_client.crawl(links=[url]) - logging.info(f"{len(data)} data is extracted.") + logging.info(f"{len(data)} data is extracted for route: {url}") extracted_data.extend(data) logging.info(f"Extracted {len(extracted_data)} documents!") diff --git a/tests/unit/test_website_etl.py b/tests/unit/test_website_etl.py index e1a6e92..825570d 100644 --- a/tests/unit/test_website_etl.py +++ b/tests/unit/test_website_etl.py @@ -1,6 +1,7 @@ from unittest import IsolatedAsyncioTestCase -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch +import pytest from dotenv import load_dotenv from hivemind_etl.website.website_etl import WebsiteETL from llama_index.core import Document @@ -29,12 +30,18 @@ async def test_extract(self): "title": "Example", } ] - self.website_etl.crawlee_client.crawl.return_value = mocked_data - - extracted_data = await self.website_etl.extract(urls) - - self.assertEqual(extracted_data, mocked_data) - self.website_etl.crawlee_client.crawl.assert_awaited_once_with(links=urls) + + # Mock the CrawleeClient class instead of the instance + with patch('hivemind_etl.website.website_etl.CrawleeClient') as MockCrawleeClient: + mock_client_instance = AsyncMock() + mock_client_instance.crawl.return_value = mocked_data + MockCrawleeClient.return_value = mock_client_instance + + extracted_data = await self.website_etl.extract(urls) + + self.assertEqual(extracted_data, mocked_data) + MockCrawleeClient.assert_called_once() + mock_client_instance.crawl.assert_awaited_once_with(links=urls) def test_transform(self): """ diff --git a/workflows.py b/workflows.py index 93896e5..df14af7 100644 --- a/workflows.py +++ b/workflows.py @@ -34,18 +34,18 @@ async def run(self, community_info: dict) -> None: raw_data = await workflow.execute_activity( extract_website, args=[urls, community_id], - start_to_close_timeout=timedelta(minutes=10), + start_to_close_timeout=timedelta(minutes=30), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=10), maximum_interval=timedelta(minutes=5), - maximum_attempts=1, + maximum_attempts=3, ), ) documents = await workflow.execute_activity( transform_data, args=[raw_data, community_id], - start_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=10), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=5), maximum_interval=timedelta(minutes=2), @@ -56,11 +56,11 @@ async def run(self, community_info: dict) -> None: await workflow.execute_activity( load_data, args=[documents, community_id], - start_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=60), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=5), maximum_interval=timedelta(minutes=2), - maximum_attempts=1, + maximum_attempts=3, ), ) @@ -74,7 +74,7 @@ async def run(self, platform_id: str | None = None) -> None: communities = await workflow.execute_activity( get_communities, platform_id, - start_to_close_timeout=timedelta(minutes=20), + start_to_close_timeout=timedelta(minutes=5), retry_policy=RetryPolicy( maximum_attempts=3, ), @@ -86,7 +86,7 @@ async def run(self, platform_id: str | None = None) -> None: child_handle = await workflow.start_child_workflow( CommunityWebsiteWorkflow.run, args=[community], - id=f"website-ingest-{community['community_id']}-{workflow.now().strftime('%Y%m%d%H%M')}", + id=f"website:ingestor:{community['community_id']}", retry_policy=RetryPolicy( maximum_attempts=1, ),