From 535f343e1bde62e87cb870fed5a634293892c757 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Wed, 4 Feb 2026 15:52:20 -0500 Subject: [PATCH] fix: if there are peers with the data we seek, but all of them are in cooldown - wait and try the range again This avoids a death loop that can happen when there aren't many peers available for a range. If they all get throttled the node will iterate through the collect_peer_intervals loop very fast and likely trip the rate limit again as soon as it is lifted - but each time it will be at a different point in the loop which can mean it takes forweaver to enqueue all itnervals for the range. Now with the wait the node will continue to march methodically through the range, waiting as necessary but never skipping ahead. --- apps/arweave/src/ar_data_sync.erl | 14 +-- apps/arweave/src/ar_peer_intervals.erl | 149 ++++++++++++++----------- 2 files changed, 87 insertions(+), 76 deletions(-) diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 66fc83f8b..093d787d5 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -1050,7 +1050,9 @@ handle_cast(collect_peer_intervals, State) -> #sync_data_state{ range_start = Start, range_end = End, disk_pool_threshold = DiskPoolThreshold, sync_phase = SyncPhase, - migrations_index = MI } = State, + migrations_index = MI, + store_id = StoreID, + sync_intervals_queue = Q } = State, CheckIsJoined = case ar_node:is_joined() of false -> @@ -1090,8 +1092,9 @@ handle_cast(collect_peer_intervals, State) -> end, ?LOG_DEBUG([{event, collect_peer_intervals_start}, {function, collect_peer_intervals}, - {store_id, State#sync_data_state.store_id}, + {store_id, StoreID}, {s, Start}, {e, End}, + {queue_size, gb_sets:size(Q)}, {is_joined, CheckIsJoined}, {is_footprint_record_migrated, IsFootprintRecordMigrated}, {intersects_disk_pool, IntersectsDiskPool}, @@ -1204,13 +1207,6 @@ handle_cast({collect_peer_intervals, Offset, Start, End, Type}, State) -> false -> %% All checks have passed, find and enqueue intervals for one %% sync bucket worth of chunks starting at offset Start. - Footprint = - case Type of - footprint -> - ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE); - _ -> - ignore - end, ar_peer_intervals:fetch(Offset, Start, End2, StoreID, Type) end end, diff --git a/apps/arweave/src/ar_peer_intervals.erl b/apps/arweave/src/ar_peer_intervals.erl index 11c232198..e4c84f481 100644 --- a/apps/arweave/src/ar_peer_intervals.erl +++ b/apps/arweave/src/ar_peer_intervals.erl @@ -52,45 +52,36 @@ fetch(Offset, Start, End, StoreID, Type) when Offset >= End -> fetch(Offset, Start, End, StoreID, Type) -> Parent = ar_data_sync:name(StoreID), spawn_link(fun() -> - {End2, EnqueueIntervals} = do_fetch(Offset, Start, End, StoreID, Type), - gen_server:cast(Parent, {enqueue_intervals, EnqueueIntervals}), - gen_server:cast(Parent, {collect_peer_intervals, End2, Start, End, Type}) + case do_fetch(Offset, Start, End, StoreID, Type) of + {End2, EnqueueIntervals} -> + gen_server:cast(Parent, {enqueue_intervals, EnqueueIntervals}), + gen_server:cast(Parent, {collect_peer_intervals, End2, Start, End, Type}); + wait -> + ar_util:cast_after(1000, Parent, + {collect_peer_intervals, Offset, Start, End, Type}) + end end). do_fetch(Offset, Start, End, StoreID, normal) -> Parent = ar_data_sync:name(StoreID), try - End2 = min(Offset + ?QUERY_RANGE_STEP_SIZE, End), - UnsyncedIntervals = get_unsynced_intervals(Offset, End2, StoreID), - - Bucket = Offset div ?NETWORK_DATA_BUCKET_SIZE, - {ok, Config} = arweave_config:get_env(), - AllPeers = - case Config#config.sync_from_local_peers_only of - true -> - Config#config.local_peers; - false -> - ar_data_discovery:get_bucket_peers(Bucket) - end, - HotPeers = [ - Peer || Peer <- AllPeers, - not ar_rate_limiter:is_on_cooldown(Peer, ?GET_SYNC_RECORD_RPM_KEY) andalso - not ar_rate_limiter:is_throttled(Peer, ?GET_SYNC_RECORD_PATH) - ], - Peers = ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT), - - {End4, EnqueueIntervals} = - case ar_intervals:is_empty(UnsyncedIntervals) of - true -> - {End2, []}; - false -> - {End3, EnqueueIntervals2} = - fetch_peer_intervals(Parent, Offset, Peers, UnsyncedIntervals), - {min(End2, End3), EnqueueIntervals2} - end, - %% Schedule the next sync bucket. The cast handler logic will pause collection - %% if needed. - {End4, EnqueueIntervals} + case get_peers(Offset, normal) of + wait -> + wait; + Peers -> + End2 = min(Offset + ?QUERY_RANGE_STEP_SIZE, End), + UnsyncedIntervals = get_unsynced_intervals(Offset, End2, StoreID), + %% Schedule the next sync bucket. The cast handler logic will pause collection + %% if needed. + case ar_intervals:is_empty(UnsyncedIntervals) of + true -> + {End2, []}; + false -> + {End3, EnqueueIntervals2} = + fetch_peer_intervals(Parent, Offset, Peers, UnsyncedIntervals), + {min(End2, End3), EnqueueIntervals2} + end + end catch Class:Reason:Stacktrace -> ?LOG_WARNING([{event, fetch_peers_process_exit}, @@ -108,39 +99,27 @@ do_fetch(Offset, Start, End, StoreID, normal) -> do_fetch(Offset, Start, End, StoreID, footprint) -> Parent = ar_data_sync:name(StoreID), try - Partition = ar_replica_2_9:get_entropy_partition(Offset + ?DATA_CHUNK_SIZE), - Footprint = ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE), - - FootprintBucket = ar_footprint_record:get_footprint_bucket(Offset + ?DATA_CHUNK_SIZE), - {ok, Config} = arweave_config:get_env(), - AllPeers = - case Config#config.sync_from_local_peers_only of - true -> - Config#config.local_peers; - false -> - ar_data_discovery:get_footprint_bucket_peers(FootprintBucket) - end, - HotPeers = [ - Peer || Peer <- AllPeers, - not ar_rate_limiter:is_on_cooldown(Peer, ?GET_FOOTPRINT_RECORD_RPM_KEY) andalso - not ar_rate_limiter:is_throttled(Peer, ?GET_FOOTPRINT_RECORD_PATH) - ], - Peers = ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT), - - UnsyncedIntervals = - ar_footprint_record:get_unsynced_intervals(Partition, Footprint, StoreID), - - EnqueueIntervals = - case ar_intervals:is_empty(UnsyncedIntervals) of - true -> - []; - false -> - fetch_peer_footprint_intervals( - Parent, Partition, Footprint, Offset, End, Peers, UnsyncedIntervals) - end, - Offset2 = get_next_fetch_offset(Offset, Start, End), - %% Schedule the next sync bucket. The cast handler logic will pause collection if needed. - {Offset2, EnqueueIntervals} + case get_peers(Offset, footprint) of + wait -> + wait; + Peers -> + Partition = ar_replica_2_9:get_entropy_partition(Offset + ?DATA_CHUNK_SIZE), + Footprint = ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE), + UnsyncedIntervals = + ar_footprint_record:get_unsynced_intervals(Partition, Footprint, StoreID), + + EnqueueIntervals = + case ar_intervals:is_empty(UnsyncedIntervals) of + true -> + []; + false -> + fetch_peer_footprint_intervals( + Parent, Partition, Footprint, Offset, End, Peers, UnsyncedIntervals) + end, + Offset2 = get_next_fetch_offset(Offset, Start, End), + %% Schedule the next sync bucket. The cast handler logic will pause collection if needed. + {Offset2, EnqueueIntervals} + end catch Class:Reason:Stacktrace -> ?LOG_WARNING([{event, fetch_footprint_intervals_process_exit}, @@ -177,6 +156,42 @@ get_next_fetch_offset(Offset, Start, End) -> %%% Private functions. %%%=================================================================== +get_peers(Offset, normal) -> + Bucket = Offset div ?NETWORK_DATA_BUCKET_SIZE, + get_peers2(Bucket, + fun(B) -> ar_data_discovery:get_bucket_peers(B) end, + ?GET_SYNC_RECORD_RPM_KEY, + ?GET_SYNC_RECORD_PATH); +get_peers(Offset, footprint) -> + FootprintBucket = ar_footprint_record:get_footprint_bucket(Offset + ?DATA_CHUNK_SIZE), + get_peers2(FootprintBucket, + fun(B) -> ar_data_discovery:get_footprint_bucket_peers(B) end, + ?GET_FOOTPRINT_RECORD_RPM_KEY, + ?GET_FOOTPRINT_RECORD_PATH). + +get_peers2(Bucket, GetPeersFun, RPMKey, Path) -> + {ok, Config} = arweave_config:get_env(), + AllPeers = + case Config#config.sync_from_local_peers_only of + true -> + Config#config.local_peers; + false -> + GetPeersFun(Bucket) + end, + HotPeers = [ + Peer || Peer <- AllPeers, + not ar_rate_limiter:is_on_cooldown(Peer, RPMKey) andalso + not ar_rate_limiter:is_throttled(Peer, Path) + ], + case length(AllPeers) > 0 andalso length(HotPeers) == 0 of + true -> + % There are peers for this Offset, but they are all on cooldown/throttled, so + % we'll give them time to recover. + wait; + false -> + ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT) + end. + %% @doc Collect the unsynced intervals between Start and End excluding the blocklisted %% intervals. get_unsynced_intervals(Start, End, StoreID) ->