Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,9 @@ handle_cast(collect_peer_intervals, State) ->
#sync_data_state{ range_start = Start, range_end = End,
disk_pool_threshold = DiskPoolThreshold,
sync_phase = SyncPhase,
migrations_index = MI } = State,
migrations_index = MI,
store_id = StoreID,
sync_intervals_queue = Q } = State,
CheckIsJoined =
case ar_node:is_joined() of
false ->
Expand Down Expand Up @@ -1090,8 +1092,9 @@ handle_cast(collect_peer_intervals, State) ->
end,
?LOG_DEBUG([{event, collect_peer_intervals_start},
{function, collect_peer_intervals},
{store_id, State#sync_data_state.store_id},
{store_id, StoreID},
{s, Start}, {e, End},
{queue_size, gb_sets:size(Q)},
{is_joined, CheckIsJoined},
{is_footprint_record_migrated, IsFootprintRecordMigrated},
{intersects_disk_pool, IntersectsDiskPool},
Expand Down Expand Up @@ -1204,13 +1207,6 @@ handle_cast({collect_peer_intervals, Offset, Start, End, Type}, State) ->
false ->
%% All checks have passed, find and enqueue intervals for one
%% sync bucket worth of chunks starting at offset Start.
Footprint =
case Type of
footprint ->
ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE);
_ ->
ignore
end,
ar_peer_intervals:fetch(Offset, Start, End2, StoreID, Type)
end
end,
Expand Down
149 changes: 82 additions & 67 deletions apps/arweave/src/ar_peer_intervals.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,45 +52,36 @@ fetch(Offset, Start, End, StoreID, Type) when Offset >= End ->
fetch(Offset, Start, End, StoreID, Type) ->
Parent = ar_data_sync:name(StoreID),
spawn_link(fun() ->
{End2, EnqueueIntervals} = do_fetch(Offset, Start, End, StoreID, Type),
gen_server:cast(Parent, {enqueue_intervals, EnqueueIntervals}),
gen_server:cast(Parent, {collect_peer_intervals, End2, Start, End, Type})
case do_fetch(Offset, Start, End, StoreID, Type) of
{End2, EnqueueIntervals} ->
gen_server:cast(Parent, {enqueue_intervals, EnqueueIntervals}),
gen_server:cast(Parent, {collect_peer_intervals, End2, Start, End, Type});
wait ->
ar_util:cast_after(1000, Parent,
{collect_peer_intervals, Offset, Start, End, Type})
end
end).

do_fetch(Offset, Start, End, StoreID, normal) ->
Parent = ar_data_sync:name(StoreID),
try
End2 = min(Offset + ?QUERY_RANGE_STEP_SIZE, End),
UnsyncedIntervals = get_unsynced_intervals(Offset, End2, StoreID),

Bucket = Offset div ?NETWORK_DATA_BUCKET_SIZE,
{ok, Config} = arweave_config:get_env(),
AllPeers =
case Config#config.sync_from_local_peers_only of
true ->
Config#config.local_peers;
false ->
ar_data_discovery:get_bucket_peers(Bucket)
end,
HotPeers = [
Peer || Peer <- AllPeers,
not ar_rate_limiter:is_on_cooldown(Peer, ?GET_SYNC_RECORD_RPM_KEY) andalso
not ar_rate_limiter:is_throttled(Peer, ?GET_SYNC_RECORD_PATH)
],
Peers = ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT),

{End4, EnqueueIntervals} =
case ar_intervals:is_empty(UnsyncedIntervals) of
true ->
{End2, []};
false ->
{End3, EnqueueIntervals2} =
fetch_peer_intervals(Parent, Offset, Peers, UnsyncedIntervals),
{min(End2, End3), EnqueueIntervals2}
end,
%% Schedule the next sync bucket. The cast handler logic will pause collection
%% if needed.
{End4, EnqueueIntervals}
case get_peers(Offset, normal) of
wait ->
wait;
Peers ->
End2 = min(Offset + ?QUERY_RANGE_STEP_SIZE, End),
UnsyncedIntervals = get_unsynced_intervals(Offset, End2, StoreID),
%% Schedule the next sync bucket. The cast handler logic will pause collection
%% if needed.
case ar_intervals:is_empty(UnsyncedIntervals) of
true ->
{End2, []};
false ->
{End3, EnqueueIntervals2} =
fetch_peer_intervals(Parent, Offset, Peers, UnsyncedIntervals),
{min(End2, End3), EnqueueIntervals2}
end
end
catch
Class:Reason:Stacktrace ->
?LOG_WARNING([{event, fetch_peers_process_exit},
Expand All @@ -108,39 +99,27 @@ do_fetch(Offset, Start, End, StoreID, normal) ->
do_fetch(Offset, Start, End, StoreID, footprint) ->
Parent = ar_data_sync:name(StoreID),
try
Partition = ar_replica_2_9:get_entropy_partition(Offset + ?DATA_CHUNK_SIZE),
Footprint = ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE),

FootprintBucket = ar_footprint_record:get_footprint_bucket(Offset + ?DATA_CHUNK_SIZE),
{ok, Config} = arweave_config:get_env(),
AllPeers =
case Config#config.sync_from_local_peers_only of
true ->
Config#config.local_peers;
false ->
ar_data_discovery:get_footprint_bucket_peers(FootprintBucket)
end,
HotPeers = [
Peer || Peer <- AllPeers,
not ar_rate_limiter:is_on_cooldown(Peer, ?GET_FOOTPRINT_RECORD_RPM_KEY) andalso
not ar_rate_limiter:is_throttled(Peer, ?GET_FOOTPRINT_RECORD_PATH)
],
Peers = ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT),

UnsyncedIntervals =
ar_footprint_record:get_unsynced_intervals(Partition, Footprint, StoreID),

EnqueueIntervals =
case ar_intervals:is_empty(UnsyncedIntervals) of
true ->
[];
false ->
fetch_peer_footprint_intervals(
Parent, Partition, Footprint, Offset, End, Peers, UnsyncedIntervals)
end,
Offset2 = get_next_fetch_offset(Offset, Start, End),
%% Schedule the next sync bucket. The cast handler logic will pause collection if needed.
{Offset2, EnqueueIntervals}
case get_peers(Offset, footprint) of
wait ->
wait;
Peers ->
Partition = ar_replica_2_9:get_entropy_partition(Offset + ?DATA_CHUNK_SIZE),
Footprint = ar_footprint_record:get_footprint(Offset + ?DATA_CHUNK_SIZE),
UnsyncedIntervals =
ar_footprint_record:get_unsynced_intervals(Partition, Footprint, StoreID),

EnqueueIntervals =
case ar_intervals:is_empty(UnsyncedIntervals) of
true ->
[];
false ->
fetch_peer_footprint_intervals(
Parent, Partition, Footprint, Offset, End, Peers, UnsyncedIntervals)
end,
Offset2 = get_next_fetch_offset(Offset, Start, End),
%% Schedule the next sync bucket. The cast handler logic will pause collection if needed.
{Offset2, EnqueueIntervals}
end
catch
Class:Reason:Stacktrace ->
?LOG_WARNING([{event, fetch_footprint_intervals_process_exit},
Expand Down Expand Up @@ -177,6 +156,42 @@ get_next_fetch_offset(Offset, Start, End) ->
%%% Private functions.
%%%===================================================================

get_peers(Offset, normal) ->
Bucket = Offset div ?NETWORK_DATA_BUCKET_SIZE,
get_peers2(Bucket,
fun(B) -> ar_data_discovery:get_bucket_peers(B) end,
?GET_SYNC_RECORD_RPM_KEY,
?GET_SYNC_RECORD_PATH);
get_peers(Offset, footprint) ->
FootprintBucket = ar_footprint_record:get_footprint_bucket(Offset + ?DATA_CHUNK_SIZE),
get_peers2(FootprintBucket,
fun(B) -> ar_data_discovery:get_footprint_bucket_peers(B) end,
?GET_FOOTPRINT_RECORD_RPM_KEY,
?GET_FOOTPRINT_RECORD_PATH).

get_peers2(Bucket, GetPeersFun, RPMKey, Path) ->
{ok, Config} = arweave_config:get_env(),
AllPeers =
case Config#config.sync_from_local_peers_only of
true ->
Config#config.local_peers;
false ->
GetPeersFun(Bucket)
end,
HotPeers = [
Peer || Peer <- AllPeers,
not ar_rate_limiter:is_on_cooldown(Peer, RPMKey) andalso
not ar_rate_limiter:is_throttled(Peer, Path)
],
case length(AllPeers) > 0 andalso length(HotPeers) == 0 of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you condition on length(AllPeers)? Does not it make sense to always wait when the discovered peer list is empty?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm expecting there may be some ranges of chunks for where there are currently no peers at all (e.g. perhaps during some of the periods with high sacrifice mining, or perhaps when the node has configured they sync_from_local_peers_only too restrictively) - in that case I didn't want to have the node block indefinitely and instead continue syncing where possible.

true ->
% There are peers for this Offset, but they are all on cooldown/throttled, so
% we'll give them time to recover.
wait;
false ->
ar_data_discovery:pick_peers(HotPeers, ?QUERY_BEST_PEERS_COUNT)
end.

%% @doc Collect the unsynced intervals between Start and End excluding the blocklisted
%% intervals.
get_unsynced_intervals(Start, End, StoreID) ->
Expand Down
Loading