diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 66fc83f8b..093d787d5 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -1050,7 +1050,9 @@ handle_cast(collect_peer_intervals, State) -> #sync_data_state{ range_start = Start, range_end = End, disk_pool_threshold = DiskPoolThreshold, sync_phase = SyncPhase, - migrations_index = MI } = State, + migrations_index = MI, + store_id = StoreID, + sync_intervals_queue = Q } = State, CheckIsJoined = case ar_node:is_joined() of false -> @@ -1090,8 +1092,9 @@ handle_cast(collect_peer_intervals, State) -> end, ?LOG_DEBUG([{event, collect_peer_intervals_start}, {function, collect_peer_intervals}, - {store_id, State#sync_data_state.store_id}, + {store_id, StoreID}, {s, Start}, {e, End}, + {queue_size, gb_sets:size(Q)}, {is_joined, CheckIsJoined}, {is_footprint_record_migrated, IsFootprintRecordMigrated}, {intersects_disk_pool, IntersectsDiskPool}, @@ -1204,13 +1207,6 @@ handle_cast({collect_peer_intervals, Offset, Start, End, Type}, State) -> false -> %% All checks have passed, find and enqueue intervals for one %% sync bucket worth of chunks starting at offset Start. - Footprint = - case Type of - footprint -> - ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE); - _ -> - ignore - end, ar_peer_intervals:fetch(Offset, Start, End2, StoreID, Type) end end, diff --git a/apps/arweave/src/ar_peer_intervals.erl b/apps/arweave/src/ar_peer_intervals.erl index 11c232198..e4c84f481 100644 --- a/apps/arweave/src/ar_peer_intervals.erl +++ b/apps/arweave/src/ar_peer_intervals.erl @@ -52,45 +52,36 @@ fetch(Offset, Start, End, StoreID, Type) when Offset >= End -> fetch(Offset, Start, End, StoreID, Type) -> Parent = ar_data_sync:name(StoreID), spawn_link(fun() -> - {End2, EnqueueIntervals} = do_fetch(Offset, Start, End, StoreID, Type), - gen_server:cast(Parent, {enqueue_intervals, EnqueueIntervals}), - gen_server:cast(Parent, {collect_peer_intervals, End2, Start, End, Type}) + case do_fetch(Offset, Start, End, StoreID, Type) of + {End2, EnqueueIntervals} -> + gen_server:cast(Parent, {enqueue_intervals, EnqueueIntervals}), + gen_server:cast(Parent, {collect_peer_intervals, End2, Start, End, Type}); + wait -> + ar_util:cast_after(1000, Parent, + {collect_peer_intervals, Offset, Start, End, Type}) + end end). do_fetch(Offset, Start, End, StoreID, normal) -> Parent = ar_data_sync:name(StoreID), try - End2 = min(Offset + ?QUERY_RANGE_STEP_SIZE, End), - UnsyncedIntervals = get_unsynced_intervals(Offset, End2, StoreID), - - Bucket = Offset div ?NETWORK_DATA_BUCKET_SIZE, - {ok, Config} = arweave_config:get_env(), - AllPeers = - case Config#config.sync_from_local_peers_only of - true -> - Config#config.local_peers; - false -> - ar_data_discovery:get_bucket_peers(Bucket) - end, - HotPeers = [ - Peer || Peer <- AllPeers, - not ar_rate_limiter:is_on_cooldown(Peer, ?GET_SYNC_RECORD_RPM_KEY) andalso - not ar_rate_limiter:is_throttled(Peer, ?GET_SYNC_RECORD_PATH) - ], - Peers = ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT), - - {End4, EnqueueIntervals} = - case ar_intervals:is_empty(UnsyncedIntervals) of - true -> - {End2, []}; - false -> - {End3, EnqueueIntervals2} = - fetch_peer_intervals(Parent, Offset, Peers, UnsyncedIntervals), - {min(End2, End3), EnqueueIntervals2} - end, - %% Schedule the next sync bucket. The cast handler logic will pause collection - %% if needed. - {End4, EnqueueIntervals} + case get_peers(Offset, normal) of + wait -> + wait; + Peers -> + End2 = min(Offset + ?QUERY_RANGE_STEP_SIZE, End), + UnsyncedIntervals = get_unsynced_intervals(Offset, End2, StoreID), + %% Schedule the next sync bucket. The cast handler logic will pause collection + %% if needed. + case ar_intervals:is_empty(UnsyncedIntervals) of + true -> + {End2, []}; + false -> + {End3, EnqueueIntervals2} = + fetch_peer_intervals(Parent, Offset, Peers, UnsyncedIntervals), + {min(End2, End3), EnqueueIntervals2} + end + end catch Class:Reason:Stacktrace -> ?LOG_WARNING([{event, fetch_peers_process_exit}, @@ -108,39 +99,27 @@ do_fetch(Offset, Start, End, StoreID, normal) -> do_fetch(Offset, Start, End, StoreID, footprint) -> Parent = ar_data_sync:name(StoreID), try - Partition = ar_replica_2_9:get_entropy_partition(Offset + ?DATA_CHUNK_SIZE), - Footprint = ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE), - - FootprintBucket = ar_footprint_record:get_footprint_bucket(Offset + ?DATA_CHUNK_SIZE), - {ok, Config} = arweave_config:get_env(), - AllPeers = - case Config#config.sync_from_local_peers_only of - true -> - Config#config.local_peers; - false -> - ar_data_discovery:get_footprint_bucket_peers(FootprintBucket) - end, - HotPeers = [ - Peer || Peer <- AllPeers, - not ar_rate_limiter:is_on_cooldown(Peer, ?GET_FOOTPRINT_RECORD_RPM_KEY) andalso - not ar_rate_limiter:is_throttled(Peer, ?GET_FOOTPRINT_RECORD_PATH) - ], - Peers = ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT), - - UnsyncedIntervals = - ar_footprint_record:get_unsynced_intervals(Partition, Footprint, StoreID), - - EnqueueIntervals = - case ar_intervals:is_empty(UnsyncedIntervals) of - true -> - []; - false -> - fetch_peer_footprint_intervals( - Parent, Partition, Footprint, Offset, End, Peers, UnsyncedIntervals) - end, - Offset2 = get_next_fetch_offset(Offset, Start, End), - %% Schedule the next sync bucket. The cast handler logic will pause collection if needed. - {Offset2, EnqueueIntervals} + case get_peers(Offset, footprint) of + wait -> + wait; + Peers -> + Partition = ar_replica_2_9:get_entropy_partition(Offset + ?DATA_CHUNK_SIZE), + Footprint = ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE), + UnsyncedIntervals = + ar_footprint_record:get_unsynced_intervals(Partition, Footprint, StoreID), + + EnqueueIntervals = + case ar_intervals:is_empty(UnsyncedIntervals) of + true -> + []; + false -> + fetch_peer_footprint_intervals( + Parent, Partition, Footprint, Offset, End, Peers, UnsyncedIntervals) + end, + Offset2 = get_next_fetch_offset(Offset, Start, End), + %% Schedule the next sync bucket. The cast handler logic will pause collection if needed. + {Offset2, EnqueueIntervals} + end catch Class:Reason:Stacktrace -> ?LOG_WARNING([{event, fetch_footprint_intervals_process_exit}, @@ -177,6 +156,42 @@ get_next_fetch_offset(Offset, Start, End) -> %%% Private functions. %%%=================================================================== +get_peers(Offset, normal) -> + Bucket = Offset div ?NETWORK_DATA_BUCKET_SIZE, + get_peers2(Bucket, + fun(B) -> ar_data_discovery:get_bucket_peers(B) end, + ?GET_SYNC_RECORD_RPM_KEY, + ?GET_SYNC_RECORD_PATH); +get_peers(Offset, footprint) -> + FootprintBucket = ar_footprint_record:get_footprint_bucket(Offset + ?DATA_CHUNK_SIZE), + get_peers2(FootprintBucket, + fun(B) -> ar_data_discovery:get_footprint_bucket_peers(B) end, + ?GET_FOOTPRINT_RECORD_RPM_KEY, + ?GET_FOOTPRINT_RECORD_PATH). + +get_peers2(Bucket, GetPeersFun, RPMKey, Path) -> + {ok, Config} = arweave_config:get_env(), + AllPeers = + case Config#config.sync_from_local_peers_only of + true -> + Config#config.local_peers; + false -> + GetPeersFun(Bucket) + end, + HotPeers = [ + Peer || Peer <- AllPeers, + not ar_rate_limiter:is_on_cooldown(Peer, RPMKey) andalso + not ar_rate_limiter:is_throttled(Peer, Path) + ], + case length(AllPeers) > 0 andalso length(HotPeers) == 0 of + true -> + % There are peers for this Offset, but they are all on cooldown/throttled, so + % we'll give them time to recover. + wait; + false -> + ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT) + end. + %% @doc Collect the unsynced intervals between Start and End excluding the blocklisted %% intervals. get_unsynced_intervals(Start, End, StoreID) ->