From 14ef9372a222188f5a9b80c6bf174e496c3a3037 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Thu, 16 Jun 2022 15:53:19 +0300 Subject: [PATCH 01/14] working hisotic crawler --- .../mooncrawl/moonworm_crawler/cli.py | 179 +++++++++++++++++- .../moonworm_crawler/continuous_crawler.py | 29 +-- .../mooncrawl/moonworm_crawler/crawler.py | 30 +++ .../mooncrawl/moonworm_crawler/db.py | 35 ++-- .../moonworm_crawler/historical_crawler.py | 134 +++++++++++++ 5 files changed, 365 insertions(+), 42 deletions(-) create mode 100644 crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index a4bb03a5f..df0a81949 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -10,6 +10,7 @@ from ..blockchain import AvailableBlockchainType from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID from .continuous_crawler import _retry_connect_web3, continuous_crawler +from .historical_crawler import historical_crawler from .crawler import ( SubscriptionTypes, blockchain_type_to_subscription_type, @@ -17,7 +18,7 @@ make_event_crawl_jobs, make_function_call_crawl_jobs, ) -from .db import get_last_labeled_block_number +from .db import get_first_labeled_block_number, get_last_labeled_block_number logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -114,6 +115,117 @@ def handle_crawl(args: argparse.Namespace) -> None: ) +def handle_historical_crawl(args: argparse.Namespace) -> None: + blockchain_type = AvailableBlockchainType(args.blockchain_type) + subscription_type = blockchain_type_to_subscription_type(blockchain_type) + + addresses_filter = [args.address] + all_event_jobs = make_event_crawl_jobs( + get_crawl_job_entries( + subscription_type, + "event", + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + ) + ) + filtered_event_jobs = [] + for job in all_event_jobs: + intersection = [ + address for address in job.contracts if address in addresses_filter + ] + if intersection: + job.contracts = intersection + filtered_event_jobs.append(job) + + logger.info(f"Filtered event crawl jobs count: {len(filtered_event_jobs)}") + + all_function_call_jobs = make_function_call_crawl_jobs( + get_crawl_job_entries( + subscription_type, + "function", + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + ) + ) + filtered_function_call_jobs = [ + job + for job in all_function_call_jobs + if job.contract_address in addresses_filter + ] + + logger.info( + f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" + ) + + logger.info(f"Blockchain type: {blockchain_type.value}") + with yield_db_session_ctx() as db_session: + web3: Optional[Web3] = None + if args.web3 is None: + logger.info( + "No web3 provider URL provided, using default (blockchan.py: connect())" + ) + web3 = _retry_connect_web3(blockchain_type, access_id=args.access_id) + else: + logger.info(f"Using web3 provider URL: {args.web3}") + web3 = Web3( + Web3.HTTPProvider( + args.web3, + ) + ) + if args.poa: + logger.info("Using PoA middleware") + web3.middleware_onion.inject(geth_poa_middleware, layer=0) + + last_labeled_block = get_first_labeled_block_number( + db_session, blockchain_type, args.address + ) + logger.info(f"Last labeled block: {last_labeled_block}") + + start_block = args.start + if start_block is None: + logger.info("No start block provided") + if last_labeled_block is not None: + start_block = last_labeled_block + logger.info(f"Using last labeled block as start: {start_block}") + else: + logger.info( + "No last labeled block found, using start block (web3.eth.blockNumber - 300)" + ) + raise ValueError( + "No start block provided and no last labeled block found" + ) + elif last_labeled_block is not None: + if start_block > last_labeled_block and not args.force: + logger.info( + f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}" + ) + logger.info( + f"Use --force to override this and start from the start block: {start_block}" + ) + + start_block = last_labeled_block + else: + logger.info(f"Using start block: {start_block}") + else: + logger.info(f"Using start block: {start_block}") + + if start_block < args.end: + raise ValueError( + f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction." + ) + + historical_crawler( + db_session, + blockchain_type, + web3, + filtered_event_jobs, + filtered_function_call_jobs, + start_block, + args.end, + args.max_blocks_batch, + args.min_sleep_time, + access_id=args.access_id, + ) + + def main() -> None: parser = argparse.ArgumentParser() parser.set_defaults(func=lambda _: parser.print_help()) @@ -211,6 +323,71 @@ def main() -> None: crawl_parser.set_defaults(func=handle_crawl) + historical_crawl_parser = subparsers.add_parser( + "historical-crawl", help="Crawl historical data" + ) + historical_crawl_parser.add_argument( + "--address", + "-a", + required=True, + type=str, + ) + historical_crawl_parser.add_argument( + "--start", + "-s", + type=int, + default=None, + ) + historical_crawl_parser.add_argument( + "--end", + "-e", + type=int, + required=True, + ) + historical_crawl_parser.add_argument( + "--blockchain-type", + "-b", + type=str, + help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}", + ) + historical_crawl_parser.add_argument( + "--web3", + type=str, + default=None, + help="Web3 provider URL", + ) + historical_crawl_parser.add_argument( + "--poa", + action="store_true", + default=False, + help="Use PoA middleware", + ) + + historical_crawl_parser.add_argument( + "--max-blocks-batch", + "-m", + type=int, + default=80, + help="Maximum number of blocks to crawl in a single batch", + ) + + historical_crawl_parser.add_argument( + "--min-sleep-time", + "-t", + type=float, + default=0.1, + help="Minimum time to sleep between crawl step", + ) + + historical_crawl_parser.add_argument( + "--force", + action="store_true", + default=False, + help="Force start from the start block", + ) + + historical_crawl_parser.set_defaults(func=handle_historical_crawl) + args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index d7cfb6229..f77cac5f3 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -24,6 +24,7 @@ make_function_call_crawl_jobs, merge_event_crawl_jobs, merge_function_call_crawl_jobs, + _retry_connect_web3, ) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events @@ -82,34 +83,6 @@ def _refetch_new_jobs( return event_crawl_jobs, function_call_crawl_jobs -def _retry_connect_web3( - blockchain_type: AvailableBlockchainType, - retry_count: int = 10, - sleep_time: float = 5, - access_id: Optional[UUID] = None, -) -> Web3: - """ - Retry connecting to the blockchain. - """ - while retry_count > 0: - retry_count -= 1 - try: - web3 = connect(blockchain_type, access_id=access_id) - web3.eth.block_number - logger.info(f"Connected to {blockchain_type}") - return web3 - except Exception as e: - if retry_count == 0: - error = e - break - logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}") - logger.info(f"Retrying in {sleep_time} seconds") - time.sleep(sleep_time) - raise Exception( - f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" - ) - - def continuous_crawler( db_session: Session, blockchain_type: AvailableBlockchainType, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index f740aaeea..ce1cd48f3 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -6,6 +6,7 @@ from datetime import datetime from enum import Enum from typing import Any, Callable, Dict, List, Optional, cast +from uuid import UUID from bugout.data import BugoutSearchResult from eth_typing.evm import ChecksumAddress @@ -15,6 +16,7 @@ from mooncrawl.data import AvailableBlockchainType +from ..blockchain import connect from ..reporter import reporter from ..settings import ( MOONSTREAM_ADMIN_ACCESS_TOKEN, @@ -93,6 +95,34 @@ def reporter_callback(error: Exception) -> None: return reporter_callback +def _retry_connect_web3( + blockchain_type: AvailableBlockchainType, + retry_count: int = 10, + sleep_time: float = 5, + access_id: Optional[UUID] = None, +) -> Web3: + """ + Retry connecting to the blockchain. + """ + while retry_count > 0: + retry_count -= 1 + try: + web3 = connect(blockchain_type, access_id=access_id) + web3.eth.block_number + logger.info(f"Connected to {blockchain_type}") + return web3 + except Exception as e: + if retry_count == 0: + error = e + break + logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}") + logger.info(f"Retrying in {sleep_time} seconds") + time.sleep(sleep_time) + raise Exception( + f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" + ) + + def blockchain_type_to_subscription_type( blockchain_type: AvailableBlockchainType, ) -> SubscriptionTypes: diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 80615c676..e456f11dc 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -1,24 +1,14 @@ import logging from typing import Any, Dict, List, Optional, Union -from eth_typing.evm import ChecksumAddress -from hexbytes.main import HexBytes -from moonstreamdb.db import yield_db_session_ctx -from moonstreamdb.models import ( - Base, - EthereumLabel, - EthereumTransaction, - PolygonLabel, - PolygonTransaction, -) + +from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore from sqlalchemy.orm import Session -from sqlalchemy.sql.expression import label -from ..blockchain import connect, get_block_model, get_label_model +from ..blockchain import get_label_model from ..data import AvailableBlockchainType from ..settings import CRAWLER_LABEL -from .crawler import FunctionCallCrawlJob, _generate_reporter_callback from .event_crawler import Event logging.basicConfig(level=logging.INFO) @@ -93,6 +83,25 @@ def get_last_labeled_block_number( return block_number[0] if block_number else None +def get_first_labeled_block_number( + db_session: Session, + blockchain_type: AvailableBlockchainType, + address: str, + label_name=CRAWLER_LABEL, +) -> Optional[int]: + label_model = get_label_model(blockchain_type) + + block_numbers = ( + db_session.query(label_model.block_number) + .filter(label_model.label == label_name) + .filter(label_model.address == address) + .order_by(label_model.block_number.asc()) + .limit(15) + .all() + ) + return block_numbers[0][0] if block_numbers else None + + def commit_session(db_session: Session) -> None: """ Save labels in the database. diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py new file mode 100644 index 000000000..d91360a9e --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -0,0 +1,134 @@ +import logging +import logging +import time +from typing import Dict, List, Optional, Tuple +from uuid import UUID + +from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore + MoonstreamEthereumStateProvider, +) +from moonworm.crawler.networks import Network # type: ignore +from sqlalchemy.orm.session import Session +from web3 import Web3 + +from ..data import AvailableBlockchainType +from .crawler import ( + EventCrawlJob, + FunctionCallCrawlJob, + _retry_connect_web3, +) +from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .event_crawler import _crawl_events +from .function_call_crawler import _crawl_functions + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def historical_crawler( + db_session: Session, + blockchain_type: AvailableBlockchainType, + web3: Optional[Web3], + event_crawl_jobs: List[EventCrawlJob], + function_call_crawl_jobs: List[FunctionCallCrawlJob], + start_block: int, + end_block: int, + max_blocks_batch: int = 100, + min_sleep_time: float = 0.1, + access_id: Optional[UUID] = None, +): + assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0" + assert min_sleep_time > 0, "min_sleep_time must be greater than 0" + assert start_block >= end_block, "start_block must be greater than end_block" + assert end_block > 0, "end_block must be greater than 0" + + if web3 is None: + web3 = _retry_connect_web3(blockchain_type, access_id=access_id) + + assert ( + web3.eth.block_number >= start_block + ), "start_block must be less than current block" + + network = ( + Network.ethereum + if blockchain_type == AvailableBlockchainType.ETHEREUM + else Network.polygon + ) + ethereum_state_provider = MoonstreamEthereumStateProvider( + web3, + network, + db_session, + ) + + logger.info(f"Starting historical event crawler start_block={start_block}") + + blocks_cache: Dict[int, int] = {} + failed_count = 0 + + while start_block >= end_block: + try: + # query db with limit 1, to avoid session closing + + time.sleep(min_sleep_time) + + batch_end_block = max( + start_block - max_blocks_batch, + end_block, + ) + + logger.info(f"Crawling events from {start_block} to {batch_end_block}") + all_events = _crawl_events( + db_session=db_session, + blockchain_type=blockchain_type, + web3=web3, + jobs=event_crawl_jobs, + from_block=batch_end_block, + to_block=start_block, + blocks_cache=blocks_cache, + db_block_query_batch=max_blocks_batch * 2, + ) + logger.info( + f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}." + ) + + add_events_to_session(db_session, all_events, blockchain_type) + + logger.info( + f"Crawling function calls from {start_block} to {batch_end_block}" + ) + all_function_calls = _crawl_functions( + blockchain_type, + ethereum_state_provider, + function_call_crawl_jobs, + batch_end_block, + start_block, + ) + logger.info( + f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}." + ) + logger.info(f"{ethereum_state_provider.metrics}") + + add_function_calls_to_session( + db_session, all_function_calls, blockchain_type + ) + + # Commiting to db + commit_session(db_session) + + start_block = batch_end_block - 1 + failed_count = 0 + except Exception as e: + + logger.error(f"Internal error: {e}") + logger.exception(e) + failed_count += 1 + if failed_count > 10: + logger.error("Too many failures, exiting") + raise e + try: + web3 = _retry_connect_web3(blockchain_type, access_id=access_id) + except Exception as err: + logger.error(f"Failed to reconnect: {err}") + logger.exception(err) + raise err From a3f2418820a451810dfb6bd4d5f4c2a19de6c2f9 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Thu, 16 Jun 2022 15:54:19 +0300 Subject: [PATCH 02/14] isort --- crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py | 2 +- crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py | 2 +- .../mooncrawl/moonworm_crawler/continuous_crawler.py | 2 +- crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py | 1 - .../mooncrawl/moonworm_crawler/historical_crawler.py | 8 +------- 5 files changed, 4 insertions(+), 11 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py index ed7845bbf..075698ecc 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py @@ -11,8 +11,8 @@ from mooncrawl.data import AvailableBlockchainType # type: ignore from ..blockchain import connect -from .base import crawl, get_checkpoint, populate_with_events from ..settings import NB_CONTROLLER_ACCESS_ID +from .base import crawl, get_checkpoint, populate_with_events logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index df0a81949..bf7a48561 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -10,7 +10,6 @@ from ..blockchain import AvailableBlockchainType from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID from .continuous_crawler import _retry_connect_web3, continuous_crawler -from .historical_crawler import historical_crawler from .crawler import ( SubscriptionTypes, blockchain_type_to_subscription_type, @@ -19,6 +18,7 @@ make_function_call_crawl_jobs, ) from .db import get_first_labeled_block_number, get_last_labeled_block_number +from .historical_crawler import historical_crawler logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index f77cac5f3..900df29d9 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -17,6 +17,7 @@ from .crawler import ( EventCrawlJob, FunctionCallCrawlJob, + _retry_connect_web3, blockchain_type_to_subscription_type, get_crawl_job_entries, heartbeat, @@ -24,7 +25,6 @@ make_function_call_crawl_jobs, merge_event_crawl_jobs, merge_function_call_crawl_jobs, - _retry_connect_web3, ) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index e456f11dc..59fde7748 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -1,7 +1,6 @@ import logging from typing import Any, Dict, List, Optional, Union - from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore from sqlalchemy.orm import Session diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index d91360a9e..1a3f2c72d 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -1,5 +1,4 @@ import logging -import logging import time from typing import Dict, List, Optional, Tuple from uuid import UUID @@ -12,16 +11,11 @@ from web3 import Web3 from ..data import AvailableBlockchainType -from .crawler import ( - EventCrawlJob, - FunctionCallCrawlJob, - _retry_connect_web3, -) +from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3 from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events from .function_call_crawler import _crawl_functions - logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) From a5a9df8695b94fd8d9e5357036a7ea84562d1646 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Tue, 21 Jun 2022 14:31:26 +0300 Subject: [PATCH 03/14] added --- .../mooncrawl/moonworm_crawler/cli.py | 11 ++++++- .../mooncrawl/moonworm_crawler/db.py | 29 ++++++++++++++--- .../moonworm_crawler/historical_crawler.py | 31 +++++++++---------- 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index bf7a48561..d9d9250af 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -151,6 +151,10 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: if job.contract_address in addresses_filter ] + if args.only_events: + filtered_function_call_jobs = [] + logger.info(f"Removing function call crawl jobs since --only-events is set") + logger.info( f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" ) @@ -385,7 +389,12 @@ def main() -> None: default=False, help="Force start from the start block", ) - + historical_crawl_parser.add_argument( + "--only-events", + action="store_true", + default=False, + help="Only crawl events", + ) historical_crawl_parser.set_defaults(func=handle_historical_crawl) args = parser.parse_args() diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 59fde7748..29c2f9f75 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -87,18 +87,37 @@ def get_first_labeled_block_number( blockchain_type: AvailableBlockchainType, address: str, label_name=CRAWLER_LABEL, + only_events: bool = False, ) -> Optional[int]: label_model = get_label_model(blockchain_type) - - block_numbers = ( + block_number_query = ( db_session.query(label_model.block_number) .filter(label_model.label == label_name) .filter(label_model.address == address) - .order_by(label_model.block_number.asc()) - .limit(15) + ) + + function_call_block_numbers = ( + block_number_query.filter(label_model.log_index == None) + .order_by(label_model.block_number) + .limit(50) .all() ) - return block_numbers[0][0] if block_numbers else None + event_block_numbers = ( + block_number_query.filter(label_model.log_index != None) + .order_by(label_model.block_number) + .limit(50) + .all() + ) + + if only_events: + return event_block_numbers[0][0] if event_block_numbers else None + else: + event_block_number = event_block_numbers[0][0] if event_block_numbers else -1 + function_call_block_number = ( + function_call_block_numbers[0][0] if function_call_block_numbers else -1 + ) + max_block_number = max(event_block_number, function_call_block_number) + return max_block_number if max_block_number != -1 else None def commit_session(db_session: Session) -> None: diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 1a3f2c72d..f3dca3576 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -62,7 +62,6 @@ def historical_crawler( while start_block >= end_block: try: - # query db with limit 1, to avoid session closing time.sleep(min_sleep_time) @@ -91,21 +90,21 @@ def historical_crawler( logger.info( f"Crawling function calls from {start_block} to {batch_end_block}" ) - all_function_calls = _crawl_functions( - blockchain_type, - ethereum_state_provider, - function_call_crawl_jobs, - batch_end_block, - start_block, - ) - logger.info( - f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}." - ) - logger.info(f"{ethereum_state_provider.metrics}") - - add_function_calls_to_session( - db_session, all_function_calls, blockchain_type - ) + if function_call_crawl_jobs: + all_function_calls = _crawl_functions( + blockchain_type, + ethereum_state_provider, + function_call_crawl_jobs, + batch_end_block, + start_block, + ) + logger.info( + f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}." + ) + + add_function_calls_to_session( + db_session, all_function_calls, blockchain_type + ) # Commiting to db commit_session(db_session) From 4166d71d4125d1895123be0ff66c2535527443d8 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Tue, 21 Jun 2022 14:33:45 +0300 Subject: [PATCH 04/14] added help message for `crawl` command --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index d9d9250af..09bc21777 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -243,7 +243,10 @@ def main() -> None: subparsers = parser.add_subparsers() - crawl_parser = subparsers.add_parser("crawl") + crawl_parser = subparsers.add_parser( + "crawl", + help="continuous crawling the event/function call jobs from bugout journal", + ) crawl_parser.add_argument( "--start", From 2336dba7c93291c0526b12ab0d6cc9e84d946791 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Wed, 22 Jun 2022 15:34:02 +0300 Subject: [PATCH 05/14] fixed some issues --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py | 2 +- crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py | 1 + .../mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 09bc21777..05128ec2b 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -179,7 +179,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: web3.middleware_onion.inject(geth_poa_middleware, layer=0) last_labeled_block = get_first_labeled_block_number( - db_session, blockchain_type, args.address + db_session, blockchain_type, args.address, only_events=args.only_events ) logger.info(f"Last labeled block: {last_labeled_block}") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index 221dbc691..225f9a077 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -38,6 +38,7 @@ def _get_block_timestamp_from_web3( Gets the timestamp of a block from the blockchain. will raise an exception if the block is not found. """ + logger.info(f"Getting block timestamp from web3 for block {block_number}") return web3.eth.getBlock(block_number).timestamp diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index f3dca3576..2fb519280 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -79,7 +79,7 @@ def historical_crawler( from_block=batch_end_block, to_block=start_block, blocks_cache=blocks_cache, - db_block_query_batch=max_blocks_batch * 2, + db_block_query_batch=max_blocks_batch, ) logger.info( f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}." From 0f110bf16aac8251eeac6778c54ff9f13a256a32 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Wed, 22 Jun 2022 18:18:02 +0300 Subject: [PATCH 06/14] removed unneeded log --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index 225f9a077..221dbc691 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -38,7 +38,6 @@ def _get_block_timestamp_from_web3( Gets the timestamp of a block from the blockchain. will raise an exception if the block is not found. """ - logger.info(f"Getting block timestamp from web3 for block {block_number}") return web3.eth.getBlock(block_number).timestamp From da78bffbd0bcbba6f9411c06e5e05e014594ae2c Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Wed, 22 Jun 2022 18:19:23 +0300 Subject: [PATCH 07/14] removed select 1 --- .../mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 900df29d9..1ac12665e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -153,8 +153,6 @@ def continuous_crawler( try: while True: try: - # query db with limit 1, to avoid session closing - db_session.execute("SELECT 1") time.sleep(current_sleep_time) end_block = min( From dab565be856f6ee8704f9f34f8010b0cd4c7845f Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Mon, 25 Jul 2022 12:55:14 +0300 Subject: [PATCH 08/14] added blocklist --- crawlers/mooncrawl/mooncrawl/blockchain.py | 2 + .../mooncrawl/generic_crawler/base.py | 87 +++++++++++++++++-- .../mooncrawl/generic_crawler/cli.py | 24 ++--- .../mooncrawl/moonworm_crawler/db.py | 3 + 4 files changed, 96 insertions(+), 20 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/blockchain.py b/crawlers/mooncrawl/mooncrawl/blockchain.py index f5f45b706..789be9a1a 100644 --- a/crawlers/mooncrawl/mooncrawl/blockchain.py +++ b/crawlers/mooncrawl/mooncrawl/blockchain.py @@ -73,6 +73,8 @@ def connect( if web3_uri.startswith("http://") or web3_uri.startswith("https://"): web3_provider = Web3.HTTPProvider(web3_uri, request_kwargs=request_kwargs) + elif web3_uri.startswith("wss://"): + web3_provider = Web3.WebsocketProvider(web3_uri) else: web3_provider = Web3.IPCProvider(web3_uri) web3_client = Web3(web3_provider) diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py index d4a93d92a..db4c125b1 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py @@ -2,8 +2,10 @@ import logging import time from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Set, Union - +from typing import Any, Dict, List, Optional, Set, Union, Callable +from eth_abi.codec import ABICodec +from web3._utils.events import get_event_data +from web3._utils.filters import construct_event_filter_params import web3 from eth_typing import ChecksumAddress from hexbytes.main import HexBytes @@ -19,7 +21,6 @@ ContractFunctionCall, utfy_dict, ) -from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore from sqlalchemy.orm.session import Session from tqdm import tqdm from web3 import Web3 @@ -96,7 +97,7 @@ def add_function_calls_with_gas_price_to_session( transactions_hashes_to_save = [ function_call.transaction_hash for function_call in function_calls ] - + logger.info(f"Querrying existing labels (function call)") existing_labels = ( db_session.query(label_model.transaction_hash) .filter( @@ -106,6 +107,7 @@ def add_function_calls_with_gas_price_to_session( ) .all() ) + logger.info(f"Querry finished") existing_labels_transactions = [label[0] for label in existing_labels] @@ -152,6 +154,76 @@ def _transform_to_w3_tx( return tx +def _fetch_events_chunk( + web3, + event_abi, + from_block: int, + to_block: int, + addresses: Optional[List[ChecksumAddress]] = None, + on_decode_error: Optional[Callable[[Exception], None]] = None, + address_block_list: Optional[List[ChecksumAddress]] = None, +) -> List[Any]: + """Get events using eth_getLogs API. + + Event structure: + { + "event": Event name, + "args": dictionary of event arguments, + "address": contract address, + "blockNumber": block number, + "transactionHash": transaction hash, + "logIndex": log index + } + + """ + + if from_block is None: + raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock") + + # Depending on the Solidity version used to compile + # the contract that uses the ABI, + # it might have Solidity ABI encoding v1 or v2. + # We just assume the default that you set on Web3 object here. + # More information here https://eth-abi.readthedocs.io/en/latest/index.html + codec: ABICodec = web3.codec + + _, event_filter_params = construct_event_filter_params( + event_abi, + codec, + fromBlock=from_block, + toBlock=to_block, + ) + if addresses: + event_filter_params["address"] = addresses + + logs = web3.eth.get_logs(event_filter_params) + logger.info(f"Fetched {len(logs)} raw logs") + # Convert raw binary data to Python proxy objects as described by ABI + all_events = [] + for log in logs: + if address_block_list and log["address"] in address_block_list: + continue + try: + raw_event = get_event_data(codec, event_abi, log) + event = { + "event": raw_event["event"], + "args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))), + "address": raw_event["address"], + "blockNumber": raw_event["blockNumber"], + "transactionHash": raw_event["transactionHash"].hex(), + "logIndex": raw_event["logIndex"], + } + all_events.append(event) + except Exception as e: + if address_block_list is not None: + address_block_list.append(log["address"]) + if on_decode_error: + on_decode_error(e) + continue + logger.info(f"Decoded {len(all_events)} logs") + return all_events + + def process_transaction( db_session: Session, web3: Web3, @@ -371,6 +443,7 @@ def crawl( logger.info(f"Crawling blocks {current_block}-{current_block + batch_size}") events = [] logger.info("Fetching events") + block_list = [] for event_abi in events_abi: raw_events = _fetch_events_chunk( web3, @@ -378,6 +451,7 @@ def crawl( current_block, batch_end, addresses, + address_block_list=block_list, ) for raw_event in raw_events: raw_event["blockTimestamp"] = get_block_timestamp( @@ -386,7 +460,7 @@ def crawl( blockchain_type, raw_event["blockNumber"], blocks_cache=db_blocks_cache, - max_blocks_batch=1000, + max_blocks_batch=100, ) event = _processEvent(raw_event) events.append(event) @@ -401,6 +475,7 @@ def crawl( ) logger.info(f"Fetched {len(transactions)} transactions") + logger.info(f"Processing transactions") function_calls = [] for tx in transactions: processed_tx, secondary_logs = process_transaction( @@ -414,6 +489,8 @@ def crawl( ) function_calls.append(processed_tx) events.extend(secondary_logs) + logger.info(f"Processed {len(function_calls)} transactions") + add_function_calls_with_gas_price_to_session( db_session, function_calls, diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py index 075698ecc..ad99f328c 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py @@ -43,12 +43,9 @@ def handle_nft_crawler(args: argparse.Namespace) -> None: web3 = connect(blockchain_type, access_id=args.access_id) else: logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider(args.web3), + web3 = connect( + blockchain_type, access_id=args.access_id, web3_uri=args.web3 ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label ) @@ -95,12 +92,9 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None: web3 = connect(blockchain_type, access_id=args.access_id) else: logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider(args.web3), + web3 = connect( + blockchain_type, access_id=args.access_id, web3_uri=args.web3 ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label ) @@ -120,6 +114,8 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None: def handle_crawl(args: argparse.Namespace) -> None: + # TODO(yhtiyar): fix it + raise NotImplementedError("Deprecated for now, since blocklist is added") logger.info(f"Starting generic crawler") label = args.label_name @@ -141,12 +137,10 @@ def handle_crawl(args: argparse.Namespace) -> None: web3 = connect(blockchain_type, access_id=args.access_id) else: logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider(args.web3), + + web3 = connect( + blockchain_type, access_id=args.access_id, web3_uri=args.web3 ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label ) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 29c2f9f75..c82541896 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -143,6 +143,7 @@ def add_events_to_session( events_hashes_to_save = [event.transaction_hash for event in events] + logger.info(f"Querying database for existing events") existing_labels = ( db_session.query(label_model.transaction_hash, label_model.log_index) .filter( @@ -153,6 +154,8 @@ def add_events_to_session( .all() ) + logger.info(f"Querry finished") + existing_labels_transactions = [] existing_log_index_by_tx_hash: Dict[str, List[int]] = {} for label in existing_labels: From c265a53f62577918db5ef99f77c7d72e58e0a9e3 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Thu, 18 Aug 2022 13:58:17 +0300 Subject: [PATCH 09/14] removed tx reciept from crawling --- crawlers/mooncrawl/mooncrawl/blockchain.py | 17 ++--- .../mooncrawl/generic_crawler/base.py | 66 ++++++++----------- .../mooncrawl/generic_crawler/cli.py | 11 +++- 3 files changed, 44 insertions(+), 50 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/blockchain.py b/crawlers/mooncrawl/mooncrawl/blockchain.py index 789be9a1a..1e9a84924 100644 --- a/crawlers/mooncrawl/mooncrawl/blockchain.py +++ b/crawlers/mooncrawl/mooncrawl/blockchain.py @@ -48,19 +48,21 @@ def connect( blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = None, access_id: Optional[UUID] = None, + request_timeout: Optional[int] = None, ) -> Web3: web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() - request_kwargs: Any = None + request_kwargs = {} if access_id is not None: - request_kwargs = { - "headers": { - NB_ACCESS_ID_HEADER: str(access_id), - NB_DATA_SOURCE_HEADER: "blockchain", - "Content-Type": "application/json", - } + request_kwargs["headers"] = { + NB_ACCESS_ID_HEADER: str(access_id), + NB_DATA_SOURCE_HEADER: "blockchain", + "Content-Type": "application/json", } + if request_timeout is not None: + request_kwargs["timeout"] = request_timeout + if web3_uri is None: if blockchain_type == AvailableBlockchainType.ETHEREUM: web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI @@ -78,7 +80,6 @@ def connect( else: web3_provider = Web3.IPCProvider(web3_uri) web3_client = Web3(web3_provider) - # Inject --dev middleware if it is not Ethereum mainnet # Docs: https://web3py.readthedocs.io/en/stable/middleware.html#geth-style-proof-of-authority if blockchain_type != AvailableBlockchainType.ETHEREUM: diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py index db4c125b1..e7f1aed43 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py @@ -45,13 +45,21 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# TODO: ADD VALUE!!! + @dataclass -class ExtededFunctionCall(ContractFunctionCall): +class ExtededFunctionCall: + block_number: int + block_timestamp: int + transaction_hash: str + contract_address: str + caller_address: str + function_name: str + function_args: Dict[str, Any] gas_price: int + value: int = 0 max_fee_per_gas: Optional[int] = None max_priority_fee_per_gas: Optional[int] = None - value: int = 0 + status: Optional[str] = None def _function_call_with_gas_price_to_label( @@ -70,8 +78,6 @@ def _function_call_with_gas_price_to_label( "name": function_call.function_name, "caller": function_call.caller_address, "args": function_call.function_args, - "status": function_call.status, - "gasUsed": function_call.gas_used, "gasPrice": function_call.gas_price, "maxFeePerGas": function_call.max_fee_per_gas, "maxPriorityFeePerGas": function_call.max_priority_fee_per_gas, @@ -232,19 +238,20 @@ def process_transaction( secondary_abi: List[Dict[str, Any]], transaction: Dict[str, Any], blocks_cache: Dict[int, int], + skip_decoding: bool = False, ): + selector = transaction["input"][:10] + function_name = selector + function_args = "unknown" + if not skip_decoding: + try: + raw_function_call = contract.decode_function_input(transaction["input"]) + function_name = raw_function_call[0].fn_name + function_args = utfy_dict(raw_function_call[1]) + except Exception as e: + pass + # logger.error(f"Failed to decode transaction : {str(e)}") - try: - raw_function_call = contract.decode_function_input(transaction["input"]) - function_name = raw_function_call[0].fn_name - function_args = utfy_dict(raw_function_call[1]) - except Exception as e: - # logger.error(f"Failed to decode transaction : {str(e)}") - selector = transaction["input"][:10] - function_name = selector - function_args = "unknown" - - transaction_reciept = web3.eth.getTransactionReceipt(transaction["hash"]) block_timestamp = get_block_timestamp( db_session, web3, @@ -262,8 +269,6 @@ def process_transaction( caller_address=transaction["from"], function_name=function_name, function_args=function_args, - status=transaction_reciept["status"], - gas_used=transaction_reciept["gasUsed"], gas_price=transaction["gasPrice"], max_fee_per_gas=transaction.get( "maxFeePerGas", @@ -272,28 +277,7 @@ def process_transaction( value=transaction["value"], ) - secondary_logs = [] - for log in transaction_reciept["logs"]: - for abi in secondary_abi: - try: - raw_event = get_event_data(web3.codec, abi, log) - event = { - "event": raw_event["event"], - "args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))), - "address": raw_event["address"], - "blockNumber": raw_event["blockNumber"], - "transactionHash": raw_event["transactionHash"].hex(), - "logIndex": raw_event["logIndex"], - "blockTimestamp": block_timestamp, - } - processed_event = _processEvent(event) - secondary_logs.append(processed_event) - - break - except: - pass - - return function_call, secondary_logs + return function_call, [] def _get_transactions( @@ -422,6 +406,7 @@ def crawl( crawl_transactions: bool = True, addresses: Optional[List[ChecksumAddress]] = None, batch_size: int = 100, + skip_decoding_transactions: bool = False, ) -> None: current_block = from_block @@ -486,6 +471,7 @@ def crawl( secondary_abi, tx, db_blocks_cache, + skip_decoding=skip_decoding_transactions, ) function_calls.append(processed_tx) events.extend(secondary_logs) diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py index ad99f328c..46dce14b0 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py @@ -44,7 +44,10 @@ def handle_nft_crawler(args: argparse.Namespace) -> None: else: logger.info(f"Using web3 provider URL: {args.web3}") web3 = connect( - blockchain_type, access_id=args.access_id, web3_uri=args.web3 + blockchain_type, + access_id=args.access_id, + web3_uri=args.web3, + request_timeout=60, ) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label @@ -61,6 +64,7 @@ def handle_nft_crawler(args: argparse.Namespace) -> None: from_block=last_crawled_block, to_block=to_block, batch_size=args.max_blocks_batch, + skip_decoding_transactions=True, ) @@ -93,7 +97,10 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None: else: logger.info(f"Using web3 provider URL: {args.web3}") web3 = connect( - blockchain_type, access_id=args.access_id, web3_uri=args.web3 + blockchain_type, + access_id=args.access_id, + web3_uri=args.web3, + request_timeout=60, ) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label From ea4e4859858621f695dbb59c54d7de3bb6c8a2b2 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Thu, 18 Aug 2022 13:59:32 +0300 Subject: [PATCH 10/14] removed gasUsed from dataset --- datasets/nfts/nfts/data.py | 2 -- datasets/nfts/nfts/datastore.py | 19 ++++++++----------- datasets/nfts/nfts/materialize.py | 4 +--- datasets/nfts/setup.py | 1 - 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/datasets/nfts/nfts/data.py b/datasets/nfts/nfts/data.py index 9a5debeaf..1ca9d1dc2 100644 --- a/datasets/nfts/nfts/data.py +++ b/datasets/nfts/nfts/data.py @@ -23,10 +23,8 @@ class NftTransaction: caller_address: str function_name: str function_args: Union[Dict[str, Any], str] - gas_used: int gas_price: int value: int - status: int max_fee_per_gas: Optional[int] = None max_priority_fee_per_gas: Optional[int] = None diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index 566fa8dae..c52dc2a27 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -34,7 +34,6 @@ def create_transactions_table_query(tabel_name) -> str: functionName TEXT NOT NULL, functionArgs JSON NOT NULL, value INTEGER NOT NULL, - gasUsed INTEGER NOT NULL, gasPrice INTEGER NOT NULL, maxFeePerGas INTEGER, maxPriorityFeePerGas INTEGER, @@ -125,14 +124,13 @@ def insertTransactionQuery(tabel_name): functionName, functionArgs, value, - gasUsed, gasPrice, maxFeePerGas, maxPriorityFeePerGas ) VALUES ( - ?,?,?,?,?,?,?,?,?,?,?,?,? + ?,?,?,?,?,?,?,?,?,?,?,? ); """ return query @@ -240,7 +238,6 @@ def nft_transaction_to_tuple(nft_transaction: NftTransaction) -> Tuple[Any]: nft_transaction.function_name, json.dumps(nft_transaction.function_args), str(nft_transaction.value), - str(nft_transaction.gas_used), str(nft_transaction.gas_price), str(nft_transaction.max_fee_per_gas), str(nft_transaction.max_priority_fee_per_gas), @@ -356,7 +353,7 @@ def insert_events( raise ValueError(f"Unknown event type: {type(event)}") if len(nft_transfers) > 0: - query = insert_nft_transfers_query("transfers") + query = insert_nft_transfers_query("erc721_transfers") cur.executemany( query, nft_transfers, @@ -405,13 +402,13 @@ def setup_database(conn: sqlite3.Connection) -> None: cur.execute(create_transactions_table_query("transactions")) cur.execute(create_approvals_table_query("approvals")) cur.execute(create_approval_for_all_table_query("approvals_for_all")) - cur.execute(create_transfers_table_query("transfers")) + cur.execute(create_transfers_table_query("erc721_transfers")) cur.execute(create_erc20_transfers_table_query("erc20_transfers")) - cur.execute(create_blockchain_type_index_query("transactions")) - cur.execute(create_blockchain_type_index_query("approvals")) - cur.execute(create_blockchain_type_index_query("approvals_for_all")) - cur.execute(create_blockchain_type_index_query("transfers")) - cur.execute(create_blockchain_type_index_query("erc20_transfers")) + # cur.execute(create_blockchain_type_index_query("transactions")) + # cur.execute(create_blockchain_type_index_query("approvals")) + # cur.execute(create_blockchain_type_index_query("approvals_for_all")) + # cur.execute(create_blockchain_type_index_query("erc721_transfers")) + # cur.execute(create_blockchain_type_index_query("erc20_transfers")) conn.commit() diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 30791434f..1f678080b 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -2,7 +2,7 @@ import sqlite3 from typing import Any, Dict, Union, cast, Iterator, List, Optional, Set import json -from attr import dataclass +from dataclasses import dataclass from moonstreamdb.models import ( EthereumLabel, @@ -70,10 +70,8 @@ def parse_transaction_label( caller_address=label_model.label_data["caller"], function_name=label_model.label_data["name"], function_args=label_model.label_data["args"], - gas_used=label_model.label_data["gasUsed"], gas_price=label_model.label_data["gasPrice"], value=value, - status=label_model.label_data["status"], max_fee_per_gas=label_model.label_data["maxFeePerGas"], max_priority_fee_per_gas=label_model.label_data["maxPriorityFeePerGas"], ) diff --git a/datasets/nfts/setup.py b/datasets/nfts/setup.py index a641f8423..251116940 100644 --- a/datasets/nfts/setup.py +++ b/datasets/nfts/setup.py @@ -39,7 +39,6 @@ "requests", "scipy", "tqdm", - "web3", ], extras_require={ "dev": ["black", "mypy", "types-requests"], From 286f58f95acaacaac31ffb30fe61e60f42b06092 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Thu, 18 Aug 2022 18:51:37 +0300 Subject: [PATCH 11/14] added threadpool workers --- datasets/nfts/nfts/cli.py | 1 + datasets/nfts/nfts/materialize.py | 114 ++++++++++++------------------ 2 files changed, 48 insertions(+), 67 deletions(-) diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index cb4d3b2ec..950edcc7a 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -98,6 +98,7 @@ def handle_materialize(args: argparse.Namespace) -> None: start_block=bounds.starting_block, end_block=bounds.ending_block, batch_size=args.batch_size, + blockchain_type=args.blockchain, ) diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 1f678080b..7391d5c41 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -1,7 +1,13 @@ import logging import sqlite3 -from typing import Any, Dict, Union, cast, Iterator, List, Optional, Set +from typing import Any, Dict, Tuple, Union, cast, Iterator, List, Optional, Set import json +from concurrent.futures import ( + ProcessPoolExecutor, + as_completed, + ThreadPoolExecutor, + Future, +) from dataclasses import dataclass from moonstreamdb.models import ( @@ -45,16 +51,8 @@ def _get_last_labeled_erc721_block( def parse_transaction_label( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftTransaction: - assert ( - label_model.label_data["type"] == "tx_call" - ), "Expected label to be of type 'tx_call'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" # TODO: this is done because I forgot to add value in polygon labels value = 0 @@ -78,19 +76,9 @@ def parse_transaction_label( def _parse_transfer_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftTransferEvent: - assert ( - label_model.label_data["type"] == "event" - ), "Expected label to be of type 'event'" - assert ( - label_model.label_data["name"] == "Transfer" - ), "Expected label to be of type 'Transfer'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" + if label_model.label_data["args"].get("tokenId") is not None: return NftTransferEvent( blockchain_type=blockchain_type, @@ -114,19 +102,9 @@ def _parse_transfer_event( def _parse_approval_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftApprovalEvent: - assert ( - label_model.label_data["type"] == "event" - ), "Expected label to be of type 'event'" - assert ( - label_model.label_data["name"] == "Approval" - ), "Expected label to be of type 'Approval'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" + return NftApprovalEvent( blockchain_type=blockchain_type, token_address=label_model.address, @@ -139,19 +117,8 @@ def _parse_approval_event( def _parse_approval_for_all_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftApprovalForAllEvent: - assert ( - label_model.label_data["type"] == "event" - ), "Expected label to be of type 'event'" - assert ( - label_model.label_data["name"] == "ApprovalForAll" - ), "Expected label to be of type 'ApprovalForAll'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" return NftApprovalForAllEvent( blockchain_type=blockchain_type, token_address=label_model.address, @@ -164,14 +131,14 @@ def _parse_approval_for_all_event( def parse_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> Union[NftTransferEvent, NftApprovalEvent, NftApprovalForAllEvent]: if label_model.label_data["name"] == "Transfer": - return _parse_transfer_event(label_model) + return _parse_transfer_event(label_model, blockchain_type) elif label_model.label_data["name"] == "Approval": - return _parse_approval_event(label_model) + return _parse_approval_event(label_model, blockchain_type) elif label_model.label_data["name"] == "ApprovalForAll": - return _parse_approval_for_all_event(label_model) + return _parse_approval_for_all_event(label_model, blockchain_type) else: raise ValueError(f"Unknown label type: {label_model.label_data['name']}") @@ -182,6 +149,7 @@ def crawl_erc721_labels( label_model: Union[EthereumLabel, PolygonLabel], start_block: int, end_block: int, + blockchain_type: str, batch_size: int = 10000, ): logger.info( @@ -191,26 +159,25 @@ def crawl_erc721_labels( pbar.set_description( f"Crawling {label_model.__tablename__} blocks {start_block}-{end_block}" ) - current_block = start_block - while current_block <= end_block: - batch_end = min(current_block + batch_size, end_block) - logger.info(f"Crawling {current_block}-{batch_end}") - labels = db_session.query(label_model).filter( - and_( - label_model.block_number >= current_block, - label_model.block_number <= batch_end, - or_( - label_model.label == ERC721_LABEL, label_model.label == ERC20_LABEL - ), + + def _crawl(from_block, to_block): + labels = ( + db_session.query(label_model) + .filter( + and_( + label_model.block_number >= from_block, + label_model.block_number <= to_block, + or_( + label_model.label == ERC721_LABEL, + label_model.label == ERC20_LABEL, + ), + ) ) + .all() ) - - logger.info(f"Found {labels.count()} labels") - transactions = [] events = [] for label in labels: - if label.label_data["type"] == "tx_call": transactions.append(parse_transaction_label(label)) else: @@ -220,6 +187,19 @@ def crawl_erc721_labels( insert_transactions(conn, transactions) insert_events(conn, events) logger.info(f"Saved {len(events)} events and {len(transactions)} transactions") - pbar.update(batch_end - current_block + 1) - current_block = batch_end + 1 + current_block = start_block + + futures: Dict[Future, Tuple[int, int]] = {} + with ThreadPoolExecutor(max_workers=10) as executor: + while current_block <= end_block: + batch_end = min(current_block + batch_size, end_block) + future = executor.submit(_crawl, current_block, batch_end) + print(f"Submitted {current_block}-{batch_end}") + futures[future] = (current_block, batch_end) + current_block = batch_end + 1 + + for future in as_completed(futures): + from_block, to_block = futures[future] + print(f"Crawled {from_block}-{to_block}") + pbar.update(to_block - from_block + 1) From 2a93aeb6549696e27a9704ef401be3d1bbe0170f Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Thu, 18 Aug 2022 20:04:14 +0300 Subject: [PATCH 12/14] working threadpool --- datasets/nfts/nfts/cli.py | 2 +- datasets/nfts/nfts/materialize.py | 62 +++++++++++++++++++++---------- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index 950edcc7a..8785902f0 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -93,7 +93,7 @@ def handle_materialize(args: argparse.Namespace) -> None: crawl_erc721_labels( db_session, - moonstream_datastore, + args.datastore, label_model, start_block=bounds.starting_block, end_block=bounds.ending_block, diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 7391d5c41..0755ebb64 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -1,3 +1,4 @@ +import contextlib import logging import sqlite3 from typing import Any, Dict, Tuple, Union, cast, Iterator, List, Optional, Set @@ -7,6 +8,7 @@ as_completed, ThreadPoolExecutor, Future, + wait, ) from dataclasses import dataclass @@ -145,7 +147,7 @@ def parse_event( def crawl_erc721_labels( db_session: Session, - conn: sqlite3.Connection, + datastore: str, label_model: Union[EthereumLabel, PolygonLabel], start_block: int, end_block: int, @@ -160,7 +162,7 @@ def crawl_erc721_labels( f"Crawling {label_model.__tablename__} blocks {start_block}-{end_block}" ) - def _crawl(from_block, to_block): + def _crawl(from_block, to_block) -> bool: labels = ( db_session.query(label_model) .filter( @@ -179,27 +181,49 @@ def _crawl(from_block, to_block): events = [] for label in labels: if label.label_data["type"] == "tx_call": - transactions.append(parse_transaction_label(label)) + transactions.append(parse_transaction_label(label, blockchain_type)) else: - events.append(parse_event(label)) + events.append(parse_event(label, blockchain_type)) logger.info(f"Parsed {len(events)} events and {len(transactions)} transactions") - insert_transactions(conn, transactions) - insert_events(conn, events) + with contextlib.closing(sqlite3.connect(datastore)) as conn: + insert_transactions(conn, transactions) + insert_events(conn, events) logger.info(f"Saved {len(events)} events and {len(transactions)} transactions") + return True + + def crawl_with_threads(ranges: List[Tuple[int, int]]): + futures: Dict[Future, Tuple[int, int]] = {} + with ThreadPoolExecutor(max_workers=20) as executor: + for from_block, to_block in ranges: + future = executor.submit(_crawl, from_block, to_block) + futures[future] = (from_block, to_block) + + for future in as_completed(futures): + from_block, to_block = futures[future] + logger.info(f"Crawled {from_block}-{to_block}") + if future.exception() is not None: + logger.error( + f"Error crawling {from_block}-{to_block}", future.exception() + ) + + wait(list(futures.keys())) current_block = start_block - futures: Dict[Future, Tuple[int, int]] = {} - with ThreadPoolExecutor(max_workers=10) as executor: - while current_block <= end_block: - batch_end = min(current_block + batch_size, end_block) - future = executor.submit(_crawl, current_block, batch_end) - print(f"Submitted {current_block}-{batch_end}") - futures[future] = (current_block, batch_end) - current_block = batch_end + 1 - - for future in as_completed(futures): - from_block, to_block = futures[future] - print(f"Crawled {from_block}-{to_block}") - pbar.update(to_block - from_block + 1) + while current_block <= end_block: + batch_end = min(current_block + batch_size * 10, end_block) + + # divide into batches with batch_size + ranges = [] + batch_start = current_block + while batch_start <= batch_end: + ranges.append((batch_start, min(batch_start + batch_size, batch_end))) + batch_start += batch_size + 1 + + print(f"Crawling {len(ranges)} ranges") + print(ranges) + crawl_with_threads(ranges) + print(f"Crawled") + pbar.update(batch_end - current_block + 1) + current_block = batch_end + 1 From a4493dcef511df3d02b331f3f0acd792fc7d78f4 Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Thu, 18 Aug 2022 20:17:12 +0300 Subject: [PATCH 13/14] added timeout to db conn --- datasets/nfts/nfts/materialize.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 0755ebb64..7a13827f3 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -186,7 +186,7 @@ def _crawl(from_block, to_block) -> bool: events.append(parse_event(label, blockchain_type)) logger.info(f"Parsed {len(events)} events and {len(transactions)} transactions") - with contextlib.closing(sqlite3.connect(datastore)) as conn: + with contextlib.closing(sqlite3.connect(datastore, timeout=200)) as conn: insert_transactions(conn, transactions) insert_events(conn, events) logger.info(f"Saved {len(events)} events and {len(transactions)} transactions") From a17a7ce77a7f177fcb121249caf1fdff19cc274d Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Fri, 9 Sep 2022 01:04:51 +0300 Subject: [PATCH 14/14] small fixes to dataset materialize --- datasets/nfts/nfts/materialize.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 7a13827f3..1f3a6ab24 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -34,7 +34,7 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -ERC721_LABEL = "erc721" +ERC721_LABEL = "erc721-v2" ERC20_LABEL = "test-erc20" @@ -194,7 +194,7 @@ def _crawl(from_block, to_block) -> bool: def crawl_with_threads(ranges: List[Tuple[int, int]]): futures: Dict[Future, Tuple[int, int]] = {} - with ThreadPoolExecutor(max_workers=20) as executor: + with ThreadPoolExecutor(max_workers=30) as executor: for from_block, to_block in ranges: future = executor.submit(_crawl, from_block, to_block) futures[future] = (from_block, to_block)