From bff5c23cd1d1d6bb2e28c34a2109b0112ec13d79 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Sun, 3 Nov 2024 02:04:15 +0000 Subject: [PATCH 1/8] feat: Introduce tool to verify a storage module when the `verify` launch option is set: 1. the node will launch from local state without joining the network 2. it will then iterate through all storage_modules validating different indices and proofs 3. any chunk that fails validation will be flagged as invalid 4. the next time the node is launchd with syncing enabled it will try to sync and pack the flagged chunks This change also refactors the code for validating proofs. It introduces a #chunk_proof record which could eventually replace the tuples and maps that are currently used when dealing with proofs. This is just an initial step, though, and the chunk_proof record is only used internally (i.e. no updates to any serialized data structures) --- apps/arweave/include/ar_config.hrl | 1 + apps/arweave/include/ar_poa.hrl | 26 + apps/arweave/src/ar.erl | 5 +- apps/arweave/src/ar_block_index.erl | 6 +- apps/arweave/src/ar_chunk_storage.erl | 59 +- apps/arweave/src/ar_config.erl | 52 +- apps/arweave/src/ar_data_sync.erl | 195 +++--- apps/arweave/src/ar_data_sync_worker.erl | 9 +- apps/arweave/src/ar_mining_stats.erl | 6 +- apps/arweave/src/ar_node_worker.erl | 2 +- apps/arweave/src/ar_nonce_limiter.erl | 33 +- .../src/ar_nonce_limiter_server_sup.erl | 2 +- apps/arweave/src/ar_peers.erl | 21 +- apps/arweave/src/ar_poa.erl | 137 ++++- apps/arweave/src/ar_sup.erl | 1 + apps/arweave/src/ar_verify_chunk_storage.erl | 566 ++++++++++++++++++ .../src/ar_verify_chunk_storage_sup.erl | 40 ++ 17 files changed, 984 insertions(+), 177 deletions(-) create mode 100644 apps/arweave/include/ar_poa.hrl create mode 100644 apps/arweave/src/ar_verify_chunk_storage.erl create mode 100644 apps/arweave/src/ar_verify_chunk_storage_sup.erl diff --git a/apps/arweave/include/ar_config.hrl b/apps/arweave/include/ar_config.hrl index 3fdaef380..eedc1ffc2 100644 --- a/apps/arweave/include/ar_config.hrl +++ b/apps/arweave/include/ar_config.hrl @@ -119,6 +119,7 @@ init = false, port = ?DEFAULT_HTTP_IFACE_PORT, mine = false, + verify = false, peers = [], block_gossip_peers = [], local_peers = [], diff --git a/apps/arweave/include/ar_poa.hrl b/apps/arweave/include/ar_poa.hrl new file mode 100644 index 000000000..11eccd05d --- /dev/null +++ b/apps/arweave/include/ar_poa.hrl @@ -0,0 +1,26 @@ +-ifndef(AR_POA_HRL). +-define(AR_POA_HRL, true). + +-record(chunk_proof, { + absolute_offset :: non_neg_integer(), + tx_root :: binary(), + tx_path :: binary(), + data_root :: binary(), + data_path :: binary(), + tx_start_offset :: non_neg_integer(), + tx_end_offset :: non_neg_integer(), + block_start_offset :: non_neg_integer(), + block_end_offset :: non_neg_integer(), + chunk_id :: binary(), + chunk_start_offset :: non_neg_integer(), + chunk_end_offset :: non_neg_integer(), + validate_data_path_ruleset :: + 'offset_rebase_support_ruleset' | + 'strict_data_split_ruleset' | + 'strict_borders_ruleset', + tx_path_is_valid = not_validated :: 'not_validated' | 'valid' | 'invalid', + data_path_is_valid = not_validated :: 'not_validated' | 'valid' | 'invalid', + chunk_is_valid = not_validated :: 'not_validated' | 'valid' | 'invalid' +}). + +-endif. diff --git a/apps/arweave/src/ar.erl b/apps/arweave/src/ar.erl index 0de0e7be1..473cadf59 100644 --- a/apps/arweave/src/ar.erl +++ b/apps/arweave/src/ar.erl @@ -342,7 +342,8 @@ show_help() -> "Useful if you have multiple machines (or replicas) " "and you want to monitor them separately on pool"}, {"rocksdb_flush_interval", "RocksDB flush interval in seconds"}, - {"rocksdb_wal_sync_interval", "RocksDB WAL sync interval in seconds"} + {"rocksdb_wal_sync_interval", "RocksDB WAL sync interval in seconds"}, + {"verify", "Run in verify mode. In verify mode, the node won't join the network and won't compute or request VDF steps. Also no mining or packing."} ] ), erlang:halt(). @@ -373,6 +374,8 @@ read_config_from_file(Path) -> parse_cli_args([], C) -> C; parse_cli_args(["mine" | Rest], C) -> parse_cli_args(Rest, C#config{ mine = true }); +parse_cli_args(["verify" | Rest], C) -> + parse_cli_args(Rest, C#config{ verify = true }); parse_cli_args(["peer", Peer | Rest], C = #config{ peers = Ps }) -> case ar_util:safe_parse_peer(Peer) of {ok, ValidPeer} -> diff --git a/apps/arweave/src/ar_block_index.erl b/apps/arweave/src/ar_block_index.erl index ce482565f..64286d29f 100644 --- a/apps/arweave/src/ar_block_index.erl +++ b/apps/arweave/src/ar_block_index.erl @@ -1,7 +1,7 @@ -module(ar_block_index). -export([init/1, update/2, member/1, get_list/1, get_list_by_hash/1, get_element_by_height/1, - get_block_bounds/1, get_intersection/2, get_intersection/1, get_range/2]). + get_block_bounds/1, get_intersection/2, get_intersection/1, get_range/2, get_last/0]). %%%=================================================================== %%% Public interface. @@ -96,6 +96,10 @@ get_range(Start, End) -> {error, invalid_start} end. +%% @doc Return the last element in the block index. +get_last() -> + ets:last(block_index). + %%%=================================================================== %%% Private functions. %%%=================================================================== diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index c8fde4c2b..45a26e3fc 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -37,15 +37,15 @@ start_link(Name, StoreID) -> gen_server:start_link({local, Name}, ?MODULE, StoreID, []). %% @doc Store the chunk under the given end offset, -%% bytes Offset - ?DATA_CHUNK_SIIZE, Offset - ?DATA_CHUNK_SIIZE + 1, .., Offset - 1. -put(Offset, Chunk) -> - put(Offset, Chunk, "default"). +%% bytes Offset - ?DATA_CHUNK_SIZE, Offset - ?DATA_CHUNK_SIZE + 1, .., Offset - 1. +put(PaddedOffset, Chunk) -> + put(PaddedOffset, Chunk, "default"). %% @doc Store the chunk under the given end offset, -%% bytes Offset - ?DATA_CHUNK_SIIZE, Offset - ?DATA_CHUNK_SIIZE + 1, .., Offset - 1. -put(Offset, Chunk, StoreID) -> +%% bytes Offset - ?DATA_CHUNK_SIZE, Offset - ?DATA_CHUNK_SIZE + 1, .., Offset - 1. +put(PaddedOffset, Chunk, StoreID) -> GenServerID = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), - case catch gen_server:call(GenServerID, {put, Offset, Chunk}) of + case catch gen_server:call(GenServerID, {put, PaddedOffset, Chunk}) of {'EXIT', {timeout, {gen_server, call, _}}} -> {error, timeout}; Reply -> @@ -155,9 +155,9 @@ delete(Offset) -> delete(Offset, "default"). %% @doc Remove the chunk with the given end offset. -delete(Offset, StoreID) -> +delete(PaddedOffset, StoreID) -> GenServerID = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), - case catch gen_server:call(GenServerID, {delete, Offset}, 20000) of + case catch gen_server:call(GenServerID, {delete, PaddedOffset}, 20000) of {'EXIT', {timeout, {gen_server, call, _}}} -> {error, timeout}; Reply -> @@ -297,22 +297,22 @@ handle_cast(Cast, State) -> ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), {noreply, State}. -handle_call({put, Offset, Chunk}, _From, State) when byte_size(Chunk) == ?DATA_CHUNK_SIZE -> +handle_call({put, PaddedOffset, Chunk}, _From, State) when byte_size(Chunk) == ?DATA_CHUNK_SIZE -> #state{ file_index = FileIndex, store_id = StoreID } = State, - case handle_store_chunk(Offset, Chunk, FileIndex, StoreID) of + case handle_store_chunk(PaddedOffset, Chunk, FileIndex, StoreID) of {ok, FileIndex2} -> {reply, ok, State#state{ file_index = FileIndex2 }}; Error -> {reply, Error, State} end; -handle_call({delete, Offset}, _From, State) -> +handle_call({delete, PaddedOffset}, _From, State) -> #state{ file_index = FileIndex, store_id = StoreID } = State, - Key = get_key(Offset), + Key = get_key(PaddedOffset), Filepath = filepath(Key, FileIndex, StoreID), - case ar_sync_record:delete(Offset, Offset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of + case ar_sync_record:delete(PaddedOffset, PaddedOffset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of ok -> - case delete_chunk(Offset, Key, Filepath) of + case delete_chunk(PaddedOffset, Key, Filepath) of ok -> {reply, ok, State}; Error2 -> @@ -437,11 +437,12 @@ get_filepath(Name, StoreID) -> filename:join([DataDir, "storage_modules", StoreID, ?CHUNK_DIR, Name]) end. -handle_store_chunk(Offset, Chunk, FileIndex, StoreID) -> - Key = get_key(Offset), - case store_chunk(Key, Offset, Chunk, FileIndex, StoreID) of +handle_store_chunk(PaddedOffset, Chunk, FileIndex, StoreID) -> + Key = get_key(PaddedOffset), + case store_chunk(Key, PaddedOffset, Chunk, FileIndex, StoreID) of {ok, Filepath} -> - case ar_sync_record:add(Offset, Offset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of + case ar_sync_record:add( + PaddedOffset, PaddedOffset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of ok -> ets:insert(chunk_storage_file_index, {{Key, StoreID}, Filepath}), {ok, maps:put(Key, Filepath, FileIndex)}; @@ -456,9 +457,9 @@ get_key(Offset) -> StartOffset = Offset - ?DATA_CHUNK_SIZE, ar_util:floor_int(StartOffset, get_chunk_group_size()). -store_chunk(Key, Offset, Chunk, FileIndex, StoreID) -> +store_chunk(Key, PaddedOffset, Chunk, FileIndex, StoreID) -> Filepath = filepath(Key, FileIndex, StoreID), - store_chunk(Key, Offset, Chunk, Filepath). + store_chunk(Key, PaddedOffset, Chunk, Filepath). filepath(Key, FileIndex, StoreID) -> case maps:get(Key, FileIndex, not_found) of @@ -468,28 +469,28 @@ filepath(Key, FileIndex, StoreID) -> Filepath end. -store_chunk(Key, Offset, Chunk, Filepath) -> +store_chunk(Key, PaddedOffset, Chunk, Filepath) -> case erlang:get({write_handle, Filepath}) of undefined -> case file:open(Filepath, [read, write, raw]) of {error, Reason} = Error -> ?LOG_ERROR([ {event, failed_to_open_chunk_file}, - {offset, Offset}, + {padded_offset, PaddedOffset}, {file, Filepath}, {reason, io_lib:format("~p", [Reason])} ]), Error; {ok, F} -> erlang:put({write_handle, Filepath}, F), - store_chunk2(Key, Offset, Chunk, Filepath, F) + store_chunk2(Key, PaddedOffset, Chunk, Filepath, F) end; F -> - store_chunk2(Key, Offset, Chunk, Filepath, F) + store_chunk2(Key, PaddedOffset, Chunk, Filepath, F) end. -store_chunk2(Key, Offset, Chunk, Filepath, F) -> - StartOffset = Offset - ?DATA_CHUNK_SIZE, +store_chunk2(Key, PaddedOffset, Chunk, Filepath, F) -> + StartOffset = PaddedOffset - ?DATA_CHUNK_SIZE, LeftChunkBorder = ar_util:floor_int(StartOffset, ?DATA_CHUNK_SIZE), ChunkOffset = StartOffset - LeftChunkBorder, RelativeOffset = LeftChunkBorder - Key, @@ -507,7 +508,7 @@ store_chunk2(Key, Offset, Chunk, Filepath, F) -> {error, Reason} = Error -> ?LOG_ERROR([ {event, failed_to_write_chunk}, - {offset, Offset}, + {padded_offset, PaddedOffset}, {file, Filepath}, {position, Position}, {reason, io_lib:format("~p", [Reason])} @@ -518,10 +519,10 @@ store_chunk2(Key, Offset, Chunk, Filepath, F) -> {ok, Filepath} end. -delete_chunk(Offset, Key, Filepath) -> +delete_chunk(PaddedOffset, Key, Filepath) -> case file:open(Filepath, [read, write, raw]) of {ok, F} -> - StartOffset = Offset - ?DATA_CHUNK_SIZE, + StartOffset = PaddedOffset - ?DATA_CHUNK_SIZE, LeftChunkBorder = ar_util:floor_int(StartOffset, ?DATA_CHUNK_SIZE), RelativeOffset = LeftChunkBorder - Key, Position = RelativeOffset + ?OFFSET_SIZE * (RelativeOffset div ?DATA_CHUNK_SIZE), diff --git a/apps/arweave/src/ar_config.erl b/apps/arweave/src/ar_config.erl index 37ab4fcb7..ad2479f90 100644 --- a/apps/arweave/src/ar_config.erl +++ b/apps/arweave/src/ar_config.erl @@ -1,8 +1,8 @@ -module(ar_config). --export([validate_config/1, use_remote_vdf_server/0, pull_from_remote_vdf_server/0, - compute_own_vdf/0, is_vdf_server/0, is_public_vdf_server/0, - parse/1, parse_storage_module/1, log_config/1]). +-export([validate_config/1, auto_join/0, verify/0, use_remote_vdf_server/0, + pull_from_remote_vdf_server/0, compute_own_vdf/0, is_vdf_server/0, + is_public_vdf_server/0, parse/1, parse_storage_module/1, log_config/1]). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). @@ -15,9 +15,19 @@ validate_config(Config) -> validate_init(Config) andalso + validate_storage_modules(Config) andalso validate_repack_in_place(Config) andalso validate_cm_pool(Config) andalso - validate_storage_modules(Config). + validate_packing_difficulty(Config) andalso + validate_verify(Config). + +auto_join() -> + {ok, Config} = application:get_env(arweave, config), + Config#config.auto_join andalso not Config#config.verify. + +verify() -> + {ok, Config} = application:get_env(arweave, config), + Config#config.verify. use_remote_vdf_server() -> {ok, Config} = application:get_env(arweave, config), @@ -167,6 +177,13 @@ parse_options([{<<"mine">>, false} | Rest], Config) -> parse_options([{<<"mine">>, Opt} | _], _) -> {error, {bad_type, mine, boolean}, Opt}; +parse_options([{<<"verify">>, true} | Rest], Config) -> + parse_options(Rest, Config#config{ verify = true }); +parse_options([{<<"verify">>, false} | Rest], Config) -> + parse_options(Rest, Config); +parse_options([{<<"verify">>, Opt} | _], _) -> + {error, {bad_type, verify, boolean}, Opt}; + parse_options([{<<"port">>, Port} | Rest], Config) when is_integer(Port) -> parse_options(Rest, Config#config{ port = Port }); parse_options([{<<"port">>, Port} | _], _) -> @@ -884,6 +901,15 @@ validate_init(Config) -> false -> true end. + +validate_storage_modules(#config{ storage_modules = StorageModules }) -> + case length(StorageModules) =:= length(lists:usort(StorageModules)) of + true -> + true; + false -> + io:format("~nDuplicate value detected in the storage_modules option.~n~n"), + false + end. validate_repack_in_place(Config) -> Modules = [ar_storage_module:id(M) || M <- Config#config.storage_modules], validate_repack_in_place(Config#config.repack_in_place_storage_modules, Modules). @@ -927,9 +953,9 @@ validate_cm_pool(Config) -> end, A andalso B andalso C. -validate_storage_modules(#config{ mine = false }) -> +validate_packing_difficulty(#config{ mine = false }) -> true; -validate_storage_modules(Config) -> +validate_packing_difficulty(Config) -> MiningAddr = Config#config.mining_addr, UniquePackingDifficulties = lists:foldl( fun({_, _, {composite, Addr, Difficulty}}, Acc) when Addr =:= MiningAddr -> @@ -950,3 +976,17 @@ validate_storage_modules(Config) -> "for the same mining address.~n~n"), false end. + + +validate_verify(#config{ verify = false }) -> + true; +validate_verify(#config{ verify = true, mine = true }) -> + io:format("~nThe verify flag cannot be set together with the mine flag.~n~n"), + false; +validate_verify(#config{ verify = true, + repack_in_place_storage_modules = RepackInPlaceStorageModules }) + when RepackInPlaceStorageModules =/= [] -> + io:format("~nThe verify flag cannot be set together with the repack_in_place flag.~n~n"), + false; +validate_verify(_Config) -> + true. diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 252be3a90..bdb11616b 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -3,14 +3,16 @@ -behaviour(gen_server). -export([name/1, start_link/2, join/1, add_tip_block/2, add_block/2, - is_chunk_proof_ratio_attractive/3, + invalidate_bad_data_record/4, is_chunk_proof_ratio_attractive/3, add_chunk/5, add_data_root_to_disk_pool/3, maybe_drop_data_root_from_disk_pool/3, get_chunk/2, get_chunk_proof/2, get_tx_data/1, get_tx_data/2, get_tx_offset/1, get_tx_offset_data_in_range/2, has_data_root/2, request_tx_data_removal/3, request_data_removal/4, record_disk_pool_chunks_count/0, record_chunk_cache_size_metric/0, is_chunk_cache_full/0, is_disk_space_sufficient/1, - get_chunk_by_byte/2, read_chunk/4, decrement_chunk_cache_size/0, - increment_chunk_cache_size/0, get_chunk_padded_offset/1, get_chunk_metadata_range/3]). + get_chunk_by_byte/2, get_chunk_seek_offset/1, read_chunk/4, read_data_path/2, + increment_chunk_cache_size/0, decrement_chunk_cache_size/0, + get_chunk_padded_offset/1, get_chunk_metadata_range/3, + get_merkle_rebase_threshold/0, should_store_in_chunk_storage/3]). -export([debug_get_disk_pool_chunks/0]). @@ -20,6 +22,7 @@ -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). -include_lib("arweave/include/ar_config.hrl"). +-include_lib("arweave/include/ar_poa.hrl"). -include_lib("arweave/include/ar_data_discovery.hrl"). -include_lib("arweave/include/ar_data_sync.hrl"). -include_lib("arweave/include/ar_sync_buckets.hrl"). @@ -48,6 +51,10 @@ join(RecentBI) -> add_tip_block(BlockTXPairs, RecentBI) -> gen_server:cast(ar_data_sync_default, {add_tip_block, BlockTXPairs, RecentBI}). +invalidate_bad_data_record(Start, End, StoreID, Case) -> + gen_server:cast(name(StoreID), {invalidate_bad_data_record, + {Start, End, StoreID, Case}}). + %% @doc The condition which is true if the chunk is too small compared to the proof. %% Small chunks make syncing slower and increase space amplification. A small chunk %% is accepted if it is the last chunk of the corresponding transaction - such chunks @@ -542,6 +549,8 @@ read_chunk(Offset, ChunkDataDB, ChunkDataKey, StoreID) -> Error end. +read_data_path(ChunkDataDB, ChunkDataKey) -> + read_data_path(undefined, ChunkDataDB, ChunkDataKey, undefined). %% The first and last arguments are introduced to match the read_chunk/4 signature. read_data_path(_Offset, ChunkDataDB, ChunkDataKey, _StoreID) -> case ar_kv:get(ChunkDataDB, ChunkDataKey) of @@ -721,7 +730,7 @@ handle_cast({join, RecentBI}, State) -> PreviousWeaveSize = element(2, hd(CurrentBI)), {ok, OrphanedDataRoots} = remove_orphaned_data(State, Offset, PreviousWeaveSize), {ok, Config} = application:get_env(arweave, config), - [gen_server:cast(list_to_atom("ar_data_sync_" ++ ar_storage_module:label(Module)), + [gen_server:cast(name(Module), {cut, Offset}) || Module <- Config#config.storage_modules], ok = ar_chunk_storage:cut(Offset, StoreID), ok = ar_sync_record:cut(Offset, ?MODULE, StoreID), @@ -1584,7 +1593,7 @@ get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID, IsMinerRequ {expected_chunk_id, ar_util:encode(ChunkID)}, {chunk_id, ar_util:encode(ComputedChunkID)}]), invalidate_bad_data_record({AbsoluteOffset - ChunkSize, - AbsoluteOffset, {chunks_index, StoreID}, StoreID, 4}), + AbsoluteOffset, StoreID, 4}), {error, chunk_not_found} end end @@ -1668,8 +1677,7 @@ read_chunk_with_metadata( false -> ok end, - invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, - {chunks_index, StoreID}, StoreID, 1}), + invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, StoreID, 1}), {error, chunk_not_found}; {error, Error} -> ?LOG_ERROR([{event, failed_to_read_chunk}, @@ -1706,7 +1714,7 @@ read_chunk_with_metadata( end end. -invalidate_bad_data_record({Start, End, ChunksIndex, StoreID, Case}) -> +invalidate_bad_data_record({Start, End, StoreID, Case}) -> [{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold), case End > T of true -> @@ -1726,7 +1734,8 @@ invalidate_bad_data_record({Start, End, ChunksIndex, StoreID, Case}) -> {range_start, PaddedStart2}, {range_end, PaddedEnd}]), case ar_sync_record:delete(PaddedEnd, PaddedStart2, ?MODULE, StoreID) of ok -> - case ar_kv:delete(ChunksIndex, << End:?OFFSET_KEY_BITSIZE >>) of + ar_sync_record:add(PaddedEnd, PaddedStart2, invalid_chunks, StoreID), + case ar_kv:delete({chunks_index, StoreID}, << End:?OFFSET_KEY_BITSIZE >>) of ok -> ok; Error2 -> @@ -1761,10 +1770,9 @@ validate_fetched_chunk(Args) -> {BlockStart, BlockEnd, TXRoot} -> ValidateDataPathRuleset = ar_poa:get_data_path_validation_ruleset( BlockStart, get_merkle_rebase_threshold()), - BlockSize = BlockEnd - BlockStart, ChunkOffset = Offset - BlockStart - 1, - case validate_proof2({TXRoot, ChunkOffset, BlockSize, DataPath, TXPath, - ChunkSize, ValidateDataPathRuleset, IsMinerRequest}) of + case validate_proof2(TXRoot, TXPath, DataPath, BlockStart, BlockEnd, + ChunkOffset, ValidateDataPathRuleset, ChunkSize, IsMinerRequest) of {true, ChunkID} -> {true, ChunkID}; false -> @@ -1778,8 +1786,7 @@ validate_fetched_chunk(Args) -> ok end, StartOffset = Offset - ChunkSize, - invalidate_bad_data_record({StartOffset, Offset, - {chunks_index, StoreID}, StoreID, 2}), + invalidate_bad_data_record({StartOffset, Offset, StoreID, 2}), false end; {_BlockStart, _BlockEnd, TXRoot2} -> @@ -1789,8 +1796,7 @@ validate_fetched_chunk(Args) -> {tx_root, ar_util:encode(TXRoot2)}, {stored_tx_root, ar_util:encode(TXRoot)}, {store_id, StoreID}]), - invalidate_bad_data_record({Offset - ChunkSize, Offset, - {chunks_index, StoreID}, StoreID, 3}), + invalidate_bad_data_record({Offset - ChunkSize, Offset, StoreID, 3}), false end end. @@ -1922,10 +1928,8 @@ remove_range(Start, End, Ref, ReplyTo) -> RefL = [make_ref() || _ <- StoreIDs], PID = spawn(fun() -> ReplyFun(ReplyFun, sets:from_list(RefL)) end), lists:foreach( - fun({StorageID, R}) -> - GenServerID = list_to_atom("ar_data_sync_" - ++ ar_storage_module:label_by_id(StorageID)), - gen_server:cast(GenServerID, {remove_range, End, Start + 1, R, PID}) + fun({StoreID, R}) -> + gen_server:cast(name(StoreID), {remove_range, End, Start + 1, R, PID}) end, lists:zip(StoreIDs, RefL) ). @@ -2458,6 +2462,12 @@ store_sync_state(#sync_data_state{ store_id = "default" } = State) -> store_sync_state(_State) -> ok. +%% @doc Look to StoreID to find data that TargetStoreID is missing. +%% Args: +%% TargetStoreID - The ID of the storage module to sync to (this module is missing data) +%% StoreID - The ID of the storage module to sync from (this module might have the data) +%% RangeStart - The start offset of the range to check +%% RangeEnd - The end offset of the range to check get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeStart, RangeEnd) -> get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeStart, @@ -2545,78 +2555,93 @@ enqueue_peer_range(Peer, RangeStart, RangeEnd, ChunkOffsets, {Q, QIntervals}) -> validate_proof(TXRoot, BlockStartOffset, Offset, BlockSize, Proof, ValidateDataPathRuleset) -> #{ data_path := DataPath, tx_path := TXPath, chunk := Chunk, packing := Packing } = Proof, - case ar_merkle:validate_path(TXRoot, Offset, BlockSize, TXPath) of - false -> + + BlockEndOffset = BlockStartOffset + BlockSize, + case ar_poa:validate_paths(TXRoot, TXPath, DataPath, BlockStartOffset, + BlockEndOffset, Offset, ValidateDataPathRuleset) of + {false, _} -> false; - {DataRoot, TXStartOffset, TXEndOffset} -> + {true, ChunkProof} -> + #chunk_proof{ + data_root = DataRoot, + absolute_offset = AbsoluteOffset, + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset, + tx_start_offset = TXStartOffset, + tx_end_offset = TXEndOffset + } = ChunkProof, TXSize = TXEndOffset - TXStartOffset, - ChunkOffset = Offset - TXStartOffset, - case ar_merkle:validate_path(DataRoot, ChunkOffset, TXSize, DataPath, - ValidateDataPathRuleset) of - false -> - false; - {ChunkID, ChunkStartOffset, ChunkEndOffset} -> - AbsoluteEndOffset = BlockStartOffset + TXStartOffset + ChunkEndOffset, - ChunkSize = ChunkEndOffset - ChunkStartOffset, - case Packing of - unpacked -> - case ar_tx:generate_chunk_id(Chunk) == ChunkID of - false -> - false; + ChunkSize = ChunkEndOffset - ChunkStartOffset, + AbsoluteEndOffset = AbsoluteOffset + ChunkSize, + case Packing of + unpacked -> + case ar_tx:generate_chunk_id(Chunk) == ChunkID of + false -> + false; + true -> + case ChunkSize == byte_size(Chunk) of true -> - case ChunkSize == byte_size(Chunk) of - true -> - {true, DataRoot, TXStartOffset, ChunkEndOffset, - TXSize, ChunkSize, ChunkID}; - false -> - false - end - end; - _ -> - ChunkArgs = {Packing, Chunk, AbsoluteEndOffset, TXRoot, ChunkSize}, - Args = {Packing, DataRoot, TXStartOffset, ChunkEndOffset, TXSize, - ChunkID}, - {need_unpacking, AbsoluteEndOffset, ChunkArgs, Args} - end + {true, DataRoot, TXStartOffset, ChunkEndOffset, + TXSize, ChunkSize, ChunkID}; + false -> + false + end + end; + _ -> + ChunkArgs = {Packing, Chunk, AbsoluteEndOffset, TXRoot, ChunkSize}, + Args = {Packing, DataRoot, TXStartOffset, ChunkEndOffset, TXSize, + ChunkID}, + {need_unpacking, AbsoluteEndOffset, ChunkArgs, Args} end end. -validate_proof2(Args) -> - {TXRoot, Offset, BlockSize, DataPath, TXPath, ChunkSize, - ValidateDataPathRuleset, IsMinerRequest} = Args, - case ar_merkle:validate_path(TXRoot, Offset, BlockSize, TXPath) of - false -> - case IsMinerRequest of - true -> - ?LOG_ERROR([{event, failed_to_validate_tx_path}, - {tags, [solution_proofs]}]); +validate_proof2( + TXRoot, TXPath, DataPath, BlockStartOffset, + BlockEndOffset, BlockRelativeOffset, ValidateDataPathRuleset, + ExpectedChunkSize, IsMinerRequest) -> + {IsValid, ChunkProof} = ar_poa:validate_paths( + TXRoot, TXPath, DataPath, BlockStartOffset, + BlockEndOffset, BlockRelativeOffset, ValidateDataPathRuleset), + case IsValid of + true -> + #chunk_proof{ + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset + } = ChunkProof, + case ChunkEndOffset - ChunkStartOffset == ExpectedChunkSize of false -> - ok - end, - false; - {DataRoot, TXStartOffset, TXEndOffset} -> - TXSize = TXEndOffset - TXStartOffset, - ChunkOffset = Offset - TXStartOffset, - case ar_merkle:validate_path(DataRoot, ChunkOffset, TXSize, DataPath, - ValidateDataPathRuleset) of - {ChunkID, ChunkStartOffset, ChunkEndOffset} -> - case ChunkEndOffset - ChunkStartOffset == ChunkSize of + case IsMinerRequest of + true -> + ?LOG_ERROR([{event, failed_to_validate_data_path_offset}, + {tags, [solution_proofs]}, + {chunk_end_offset, ChunkEndOffset}, + {chunk_start_offset, ChunkStartOffset}, + {chunk_size, ExpectedChunkSize}]); false -> - case IsMinerRequest of - true -> - ?LOG_ERROR([{event, failed_to_validate_data_path_offset}, - {tags, [solution_proofs]}, - {chunk_end_offset, ChunkEndOffset}, - {chunk_start_offset, ChunkStartOffset}, - {chunk_size, ChunkSize}]); - false -> - ok - end, - false; + ok + end, + false; + true -> + {true, ChunkID} + end; + false -> + #chunk_proof{ + tx_path_is_valid = TXPathIsValid, + data_path_is_valid = DataPathIsValid + } = ChunkProof, + case {TXPathIsValid, DataPathIsValid} of + {invalid, _} -> + case IsMinerRequest of true -> - {true, ChunkID} - end; - _ -> + ?LOG_ERROR([{event, failed_to_validate_tx_path}, + {tags, [solution_proofs]}]); + false -> + ok + end, + false; + {_, invalid} -> case IsMinerRequest of true -> ?LOG_ERROR([{event, failed_to_validate_data_path}, @@ -3357,9 +3382,7 @@ process_disk_pool_matured_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, increment_chunk_cache_size(), Args2 = {DataRoot, AbsoluteOffset, TXPath, TXRoot, DataPath, unpacked, Offset, ChunkSize, Chunk, Chunk, none, none}, - Label = ar_storage_module:label_by_id(StoreID6), - gen_server:cast(list_to_atom("ar_data_sync_" ++ Label), - {pack_and_store_chunk, Args2}), + gen_server:cast(name(StoreID6), {pack_and_store_chunk, Args2}), gen_server:cast(self(), {process_disk_pool_chunk_offsets, Iterator, false, Args}), {noreply, cache_recently_processed_offset(AbsoluteOffset, ChunkDataKey, diff --git a/apps/arweave/src/ar_data_sync_worker.erl b/apps/arweave/src/ar_data_sync_worker.erl index e28a8ad5f..1302d4b8a 100644 --- a/apps/arweave/src/ar_data_sync_worker.erl +++ b/apps/arweave/src/ar_data_sync_worker.erl @@ -144,10 +144,8 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID, SkipSm read_range2(MessagesRemaining, {Start + ChunkSize, End, OriginStoreID, TargetStoreID, SkipSmall}); not_found -> - Label = ar_storage_module:label_by_id(OriginStoreID), - gen_server:cast(list_to_atom("ar_data_sync_" ++ Label), - {invalidate_bad_data_record, {Start, AbsoluteOffset, ChunksIndex, - OriginStoreID, 1}}), + ar_data_sync:invalidate_bad_data_record( + Start, AbsoluteOffset, OriginStoreID, 1), read_range2(MessagesRemaining-1, {Start + ChunkSize, End, OriginStoreID, TargetStoreID, SkipSmall}); {error, Error} -> @@ -172,8 +170,7 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID, SkipSm Args = {DataRoot, AbsoluteOffset, TXPath, TXRoot, DataPath, Packing, RelativeOffset, ChunkSize, Chunk, UnpackedChunk, TargetStoreID, ChunkDataKey}, - gen_server:cast(list_to_atom("ar_data_sync_" - ++ ar_storage_module:label_by_id(TargetStoreID)), + gen_server:cast(ar_data_sync:name(TargetStoreID), {pack_and_store_chunk, Args}), read_range2(MessagesRemaining-1, {Start + ChunkSize, End, OriginStoreID, TargetStoreID, diff --git a/apps/arweave/src/ar_mining_stats.erl b/apps/arweave/src/ar_mining_stats.erl index 11339a32c..a698b62a5 100644 --- a/apps/arweave/src/ar_mining_stats.erl +++ b/apps/arweave/src/ar_mining_stats.erl @@ -338,9 +338,11 @@ get_packing() -> undefined; MultiplePackings -> % More than one unique packing found - ?LOG_ERROR([ + ?LOG_WARNING([ {event, get_packing_failed}, {reason, multiple_unique_packings}, - {unique_packings, [format_packing(Packing) || Packing <- MultiplePackings]} + {unique_packings, + string:join( + [format_packing(Packing) || Packing <- MultiplePackings], ", ")} ]), undefined end. diff --git a/apps/arweave/src/ar_node_worker.erl b/apps/arweave/src/ar_node_worker.erl index ebedf43c0..6ba61fba7 100644 --- a/apps/arweave/src/ar_node_worker.erl +++ b/apps/arweave/src/ar_node_worker.erl @@ -93,7 +93,7 @@ init([]) -> validate_trusted_peers(Config), StartFromLocalState = Config#config.start_from_latest_state orelse Config#config.start_from_block /= undefined, - case {StartFromLocalState, Config#config.init, Config#config.auto_join} of + case {StartFromLocalState, Config#config.init, ar_config:auto_join()} of {false, false, true} -> ar_join:start(ar_peers:get_trusted_peers()); {true, _, _} -> diff --git a/apps/arweave/src/ar_nonce_limiter.erl b/apps/arweave/src/ar_nonce_limiter.erl index afbcd2bf2..90d9441e5 100644 --- a/apps/arweave/src/ar_nonce_limiter.erl +++ b/apps/arweave/src/ar_nonce_limiter.erl @@ -408,22 +408,27 @@ apply_external_update(Update, Peer) -> %%%=================================================================== init([]) -> - ok = ar_events:subscribe(node_state), - State = - case ar_node:is_joined() of - true -> - Blocks = get_blocks(), - handle_initialized(Blocks, #state{}); - _ -> - #state{} - end, - case ar_config:use_remote_vdf_server() and not ar_config:compute_own_vdf() of + case ar_config:verify() of true -> - gen_server:cast(?MODULE, check_external_vdf_server_input); + ignore; false -> - ok - end, - {ok, start_worker(State#state{ autocompute = ar_config:compute_own_vdf() })}. + ok = ar_events:subscribe(node_state), + State = + case ar_node:is_joined() of + true -> + Blocks = get_blocks(), + handle_initialized(Blocks, #state{}); + _ -> + #state{} + end, + case ar_config:use_remote_vdf_server() and not ar_config:compute_own_vdf() of + true -> + gen_server:cast(?MODULE, check_external_vdf_server_input); + false -> + ok + end, + {ok, start_worker(State#state{ autocompute = ar_config:compute_own_vdf() })} + end. get_blocks() -> B = ar_node:get_current_block(), diff --git a/apps/arweave/src/ar_nonce_limiter_server_sup.erl b/apps/arweave/src/ar_nonce_limiter_server_sup.erl index fb3898e2a..eab8540a4 100644 --- a/apps/arweave/src/ar_nonce_limiter_server_sup.erl +++ b/apps/arweave/src/ar_nonce_limiter_server_sup.erl @@ -21,7 +21,7 @@ start_link() -> %% =================================================================== init([]) -> - case ar_config:is_vdf_server() of + case ar_config:is_vdf_server() andalso not ar_config:verify() of false -> ignore; true -> diff --git a/apps/arweave/src/ar_peers.erl b/apps/arweave/src/ar_peers.erl index 6b6341478..fa643d43e 100644 --- a/apps/arweave/src/ar_peers.erl +++ b/apps/arweave/src/ar_peers.erl @@ -375,13 +375,20 @@ resolve_and_cache_peer(RawPeer, Type) -> %%%=================================================================== init([]) -> - %% Trap exit to avoid corrupting any open files on quit. - process_flag(trap_exit, true), - ok = ar_events:subscribe(block), - load_peers(), - gen_server:cast(?MODULE, rank_peers), - gen_server:cast(?MODULE, ping_peers), - timer:apply_interval(?GET_MORE_PEERS_FREQUENCY_MS, ?MODULE, discover_peers, []), + {ok, Config} = application:get_env(arweave, config), + case Config#config.verify of + true -> + ok; + false -> + %% Trap exit to avoid corrupting any open files on quit. + process_flag(trap_exit, true), + ok = ar_events:subscribe(block), + load_peers(), + gen_server:cast(?MODULE, rank_peers), + gen_server:cast(?MODULE, ping_peers), + timer:apply_interval(?GET_MORE_PEERS_FREQUENCY_MS, ?MODULE, discover_peers, []) + end, + {ok, #state{}}. handle_call(Request, _From, State) -> diff --git a/apps/arweave/src/ar_poa.erl b/apps/arweave/src/ar_poa.erl index 6bb306d80..fe71c192e 100644 --- a/apps/arweave/src/ar_poa.erl +++ b/apps/arweave/src/ar_poa.erl @@ -3,8 +3,10 @@ -module(ar_poa). -export([get_data_path_validation_ruleset/2, get_data_path_validation_ruleset/3, - validate_pre_fork_2_5/4, validate/1, get_padded_offset/2]). + validate_pre_fork_2_5/4, validate/1, validate_paths/4, validate_paths/7, + get_padded_offset/2]). +-include_lib("arweave/include/ar_poa.hrl"). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). -include_lib("arweave/include/ar_pricing.hrl"). @@ -51,34 +53,123 @@ validate(Args) -> {BlockStartOffset, RecallOffset, TXRoot, BlockSize, SPoA, Packing, SubChunkIndex, ExpectedChunkID} = Args, #poa{ chunk = Chunk, unpacked_chunk = UnpackedChunk } = SPoA, - TXPath = SPoA#poa.tx_path, - RecallBucketOffset = get_recall_bucket_offset(RecallOffset, BlockStartOffset), + + case validate_paths(SPoA, TXRoot, RecallOffset, BlockStartOffset, BlockSize) of + {false, _} -> + false; + {true, ChunkProof} -> + #chunk_proof{ + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset, + tx_start_offset = TXStartOffset + } = ChunkProof, + case ExpectedChunkID of + not_set -> + validate2(Packing, {ChunkID, ChunkStartOffset, + ChunkEndOffset, BlockStartOffset, TXStartOffset, + TXRoot, Chunk, UnpackedChunk, SubChunkIndex}); + _ -> + case ChunkID == ExpectedChunkID of + false -> + false; + true -> + {true, ChunkID} + end + end + end. + +%% @doc Validate the TXPath and DataPath for a chunk. This will return the ChunkID but won't +%% validate that the ChunkID is correct. +%% +%% SPoA: the proof of access +%% RecallOffset: the absoluteoffset of the recall byte - + validate_paths(#poa{} = SPoA, TXRoot, RecallOffset, BlockStartOffset, BlockSize) -> + BlockRelativeOffset = get_recall_bucket_offset(RecallOffset, BlockStartOffset), ValidateDataPathRuleset = get_data_path_validation_ruleset(BlockStartOffset), - case ar_merkle:validate_path(TXRoot, RecallBucketOffset, BlockSize, TXPath) of + + Proof = #chunk_proof{ + absolute_offset = BlockStartOffset + BlockRelativeOffset, + tx_root = TXRoot, + tx_path = SPoA#poa.tx_path, + data_path = SPoA#poa.data_path, + block_start_offset = BlockStartOffset, + block_end_offset = BlockStartOffset + BlockSize, + validate_data_path_ruleset = ValidateDataPathRuleset + }, + validate_paths(Proof). + +%% @doc Validate the TXPath and DataPath for a chunk. This will return the ChunkID but won't +%% validate that the ChunkID is correct. +%% +%% AbsoluteOffset: the end offset of the chunk - indexed to the beginning of the weave +validate_paths(TXRoot, TXPath, DataPath, AbsoluteOffset) -> + {BlockStartOffset, BlockEndOffset, TXRoot} = + ar_block_index:get_block_bounds(AbsoluteOffset), + + Proof = #chunk_proof{ + absolute_offset = AbsoluteOffset, + tx_root = TXRoot, + tx_path = TXPath, + data_path = DataPath, + block_start_offset = BlockStartOffset, + block_end_offset = BlockEndOffset, + validate_data_path_ruleset = get_data_path_validation_ruleset(BlockStartOffset) + }, + validate_paths(Proof). + +validate_paths( + TXRoot, TXPath, DataPath, BlockStartOffset, BlockEndOffset, BlockRelativeOffset, + ValidateDataPathRuleset) -> + Proof = #chunk_proof{ + absolute_offset = BlockStartOffset + BlockRelativeOffset, + tx_root = TXRoot, + tx_path = TXPath, + data_path = DataPath, + block_start_offset = BlockStartOffset, + block_end_offset = BlockEndOffset, + validate_data_path_ruleset = ValidateDataPathRuleset + }, + validate_paths(Proof). + +validate_paths(Proof) -> + #chunk_proof{ + absolute_offset = AbsoluteOffset, + tx_root = TXRoot, + tx_path = TXPath, + data_path = DataPath, + block_start_offset = BlockStartOffset, + block_end_offset = BlockEndOffset, + validate_data_path_ruleset = ValidateDataPathRuleset + } = Proof, + + BlockRelativeOffset = AbsoluteOffset - BlockStartOffset, + BlockSize = BlockEndOffset - BlockStartOffset, + + case ar_merkle:validate_path(TXRoot, BlockRelativeOffset, BlockSize, TXPath) of false -> - false; + {false, Proof#chunk_proof{ tx_path_is_valid = invalid }}; {DataRoot, TXStartOffset, TXEndOffset} -> + Proof2 = Proof#chunk_proof{ + data_root = DataRoot, + tx_start_offset = TXStartOffset, + tx_end_offset = TXEndOffset, + tx_path_is_valid = valid + }, TXSize = TXEndOffset - TXStartOffset, - RecallChunkOffset = RecallBucketOffset - TXStartOffset, - DataPath = SPoA#poa.data_path, - case ar_merkle:validate_path(DataRoot, RecallChunkOffset, TXSize, DataPath, - ValidateDataPathRuleset) of + TXRelativeOffset = BlockRelativeOffset - TXStartOffset, + case ar_merkle:validate_path( + DataRoot, TXRelativeOffset, TXSize, DataPath, ValidateDataPathRuleset) of false -> - false; + {false, Proof2#chunk_proof{ data_path_is_valid = invalid }}; {ChunkID, ChunkStartOffset, ChunkEndOffset} -> - case ExpectedChunkID of - not_set -> - validate2(Packing, {ChunkID, ChunkStartOffset, - ChunkEndOffset, BlockStartOffset, TXStartOffset, - TXRoot, Chunk, UnpackedChunk, SubChunkIndex}); - _ -> - case ChunkID == ExpectedChunkID of - false -> - false; - true -> - {true, ChunkID} - end - end + Proof3 = Proof2#chunk_proof{ + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset, + data_path_is_valid = valid + }, + {true, Proof3} end end. diff --git a/apps/arweave/src/ar_sup.erl b/apps/arweave/src/ar_sup.erl index cb9279b1c..e601f5030 100644 --- a/apps/arweave/src/ar_sup.erl +++ b/apps/arweave/src/ar_sup.erl @@ -79,6 +79,7 @@ init([]) -> ?CHILD(ar_header_sync, worker), ?CHILD_SUP(ar_data_sync_sup, supervisor), ?CHILD_SUP(ar_chunk_storage_sup, supervisor), + ?CHILD_SUP(ar_verify_chunk_storage_sup, supervisor), ?CHILD(ar_global_sync_record, worker), ?CHILD_SUP(ar_nonce_limiter_server_sup, supervisor), ?CHILD(ar_nonce_limiter, worker), diff --git a/apps/arweave/src/ar_verify_chunk_storage.erl b/apps/arweave/src/ar_verify_chunk_storage.erl new file mode 100644 index 000000000..91f9bfe5f --- /dev/null +++ b/apps/arweave/src/ar_verify_chunk_storage.erl @@ -0,0 +1,566 @@ +%%% The blob storage optimized for fast reads. +-module(ar_verify_chunk_storage). + +-behaviour(gen_server). + + +-export([start_link/2, init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). + +-include_lib("arweave/include/ar.hrl"). +-include_lib("arweave/include/ar_consensus.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(report, { + total_error_bytes = 0 :: non_neg_integer(), + total_error_chunks = 0 :: non_neg_integer(), + error_bytes = #{} :: #{atom() => non_neg_integer()}, + error_chunks = #{} :: #{atom() => non_neg_integer()} +}). + +-record(state, { + store_id :: binary(), + packing :: binary(), + start_time :: non_neg_integer(), + start_offset :: non_neg_integer(), + end_offset :: non_neg_integer(), + cursor :: non_neg_integer(), + ready = false :: boolean(), + report = #report{} :: #report{} +}). + +-define(REPORT_PROGRESS_INTERVAL, 5000). + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +%% @doc Start the server. +start_link(Name, StorageModule) -> + gen_server:start_link({local, Name}, ?MODULE, StorageModule, []). + +%%%=================================================================== +%%% Generic server callbacks. +%%%=================================================================== + +init(StoreID) -> + ?LOG_ERROR([{event, verify_chunk_storage_started}, {store_id, StoreID}]), + {StartOffset, EndOffset} = ar_storage_module:get_range(StoreID), + gen_server:cast(self(), verify), + ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), + {ok, #state{ + store_id = StoreID, + packing = ar_storage_module:get_packing(StoreID), + start_time = erlang:system_time(millisecond), + start_offset = StartOffset, + end_offset = EndOffset, + cursor = StartOffset, + ready = is_ready(EndOffset) + }}. + +handle_cast(verify, #state{ready = false, end_offset = EndOffset} = State) -> + ?LOG_ERROR([{event, not_ready}]), + ar_util:cast_after(1000, self(), verify), + {noreply, State#state{ready = is_ready(EndOffset)}}; +handle_cast(verify, + #state{cursor = Cursor, end_offset = EndOffset} = State) when Cursor >= EndOffset -> + ar:console("Done!~n"), + {noreply, State}; +handle_cast(verify, #state{store_id = StoreID} = State) -> + {noreply, verify(State)}; + +handle_cast(report_progress, #state{ready = false} = State) -> + ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), + {noreply, State}; +handle_cast(report_progress, #state{cursor = Cursor, end_offset = EndOffset} = State) + when Cursor >= EndOffset -> + {noreply, State}; +handle_cast(report_progress, State) -> + #state{ + start_time = StartTime, + cursor = Cursor, + start_offset = StartOffset, + end_offset = EndOffset, + report = Report, + store_id = StoreID + } = State, + Bytes = Cursor - StartOffset, + MB = Bytes div 1000000, + Progress = Bytes * 100 div (EndOffset - StartOffset), + ElapsedTime = (erlang:system_time(millisecond) - StartTime) div 1000, + Rate = MB div ElapsedTime, + Intersection = ar_sync_record:get_intersection_size(EndOffset, StartOffset, invalid_chunks, StoreID), + ar:console("============== ~s ==============~n", [StoreID]), + ar:console("Verified ~B GB. ~B% done. ~B elapsed. ~B MB/s.~n", [MB div 1000, Progress, ElapsedTime, Rate]), + ar:console("Missing chunks: ~B~n", [Report#report.total_error_chunks]), + ar:console("Missing bytes: ~B MB~n", [Report#report.total_error_bytes div 1000000]), + ar:console("Missing bytes (padded): ~B MB~n", [Report#report.total_error_chunks * ?DATA_CHUNK_SIZE div 1000000]), + ar:console("Report: ~p~n", [Report]), + ar:console("Intervals: ~p~n", [Intersection]), + ar:console("=============================================================================================~n~n"), + ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), + {noreply, State}; + +handle_cast(Cast, State) -> + ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), + {noreply, State}. + +handle_call(Call, From, State) -> + ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}, {from, From}]), + {noreply, State}. + +handle_info(Info, State) -> + ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +is_ready(EndOffset) -> + case ar_block_index:get_last() of + '$end_of_table' -> + false; + {WeaveSize, _Height, _H, _TXRoot} -> + WeaveSize >= EndOffset + end. + +verify(State) -> + #state{store_id = StoreID} = State, + {UnionInterval, Intervals} = query_intervals(State), + State2 = verify_chunks(UnionInterval, Intervals, State), + case State2#state.cursor >= State2#state.end_offset of + true -> + ar:console("Done verifying ~s!~n", [StoreID]), + ?LOG_INFO([{event, verify_chunk_storage_verify_chunks_done}, {store_id, StoreID}]); + false -> + gen_server:cast(self(), verify) + end, + State2. + +verify_chunks(not_found, _Intervals, State) -> + State#state{ cursor = State#state.end_offset }; +verify_chunks({End, _Start}, _Intervals, #state{cursor = Cursor} = State) when Cursor >= End -> + State; +verify_chunks({IntervalEnd, IntervalStart}, Intervals, State) -> + #state{cursor = Cursor, store_id = StoreID} = State, + Cursor2 = max(IntervalStart, Cursor), + ChunkData = ar_data_sync:get_chunk_by_byte({chunks_index, StoreID}, Cursor2+1), + State2 = verify_chunk(ChunkData, Intervals, State#state{ cursor = Cursor2 }), + verify_chunks({IntervalEnd, IntervalStart}, Intervals, State2). + +verify_chunk({error, Reason}, _Intervals, State) -> + #state{ cursor = Cursor } = State, + log_error(get_chunk_error, Cursor, ?DATA_CHUNK_SIZE, [{reason, Reason}], State); +verify_chunk({ok, _Key, MetaData}, Intervals, State) -> + {AbsoluteOffset, _ChunkDataKey, _TXRoot, _DataRoot, _TXPath, + _TXRelativeOffset, ChunkSize} = MetaData, + {ChunkStorageInterval, _DataSyncInterval} = Intervals, + + State2 = verify_chunk_storage(AbsoluteOffset, ChunkSize, ChunkStorageInterval, State), + + State3 = verify_proof(MetaData, State2), + + PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + State3#state{ cursor = PaddedOffset }. + +verify_proof(MetaData, State) -> + #state{ store_id = StoreID } = State, + {AbsoluteOffset, ChunkDataKey, TXRoot, _DataRoot, TXPath, + _TXRelativeOffset, ChunkSize} = MetaData, + + case ar_data_sync:read_data_path({chunk_data_db, StoreID}, ChunkDataKey) of + {ok, DataPath} -> + case ar_poa:validate_paths(TXRoot, TXPath, DataPath, AbsoluteOffset - 1) of + {false, _Proof} -> + invalidate_chunk(validate_paths_error, AbsoluteOffset, ChunkSize, State); + {true, _Proof} -> + State + end; + Error -> + invalidate_chunk( + read_data_path_error, AbsoluteOffset, ChunkSize, [{reason, Error}], State) + end. + +verify_chunk_storage(Offset, _ChunkSize, {End, Start}, State) + when Offset >= Start andalso Offset =< End -> + State; +verify_chunk_storage(Offset, ChunkSize, _Interval, State) -> + #state{ packing = Packing } = State, + case ar_data_sync:should_store_in_chunk_storage(Offset, ChunkSize, Packing) of + true -> + invalidate_chunk(chunk_storage_gap, Offset, ChunkSize, State); + false -> + State + end. + +invalidate_chunk(Type, Offset, ChunkSize, State) -> + invalidate_chunk(Type, Offset, ChunkSize, [], State). + +invalidate_chunk(Type, Offset, ChunkSize, Logs, State) -> + #state{ store_id = StoreID } = State, + ar_data_sync:invalidate_bad_data_record(Offset - ChunkSize, Offset, StoreID, 5), + log_error(Type, Offset, ChunkSize, Logs, State). + +log_error(Type, Offset, ChunkSize, Logs, State) -> + #state{ report = Report, store_id = StoreID, cursor = Cursor, packing = Packing } = State, + + LogMessage = [{event, verify_chunk_storage_error}, + {type, Type}, {store_id, StoreID}, + {packing, ar_serialize:encode_packing(Packing, true)}, + {offset, Offset}, {cursor, Cursor}, {chunk_size, ChunkSize}] ++ Logs, + ?LOG_INFO(LogMessage), + NewBytes = maps:get(Type, Report#report.error_bytes, 0) + ChunkSize, + NewChunks = maps:get(Type, Report#report.error_chunks, 0) + 1, + + Report2 = Report#report{ + total_error_bytes = Report#report.total_error_bytes + ChunkSize, + total_error_chunks = Report#report.total_error_chunks + 1, + error_bytes = maps:put(Type, NewBytes, Report#report.error_bytes), + error_chunks = maps:put(Type, NewChunks, Report#report.error_chunks) + }, + State#state{ report = Report2 }. + +query_intervals(State) -> + #state{cursor = Cursor, store_id = StoreID} = State, + {ChunkStorageInterval, DataSyncInterval} = align_intervals(Cursor, StoreID), + UnionInterval = union_intervals(ChunkStorageInterval, DataSyncInterval), + {UnionInterval, {ChunkStorageInterval, DataSyncInterval}}. + +align_intervals(Cursor, StoreID) -> + ChunkStorageInterval = ar_sync_record:get_next_synced_interval( + Cursor, infinity, ar_chunk_storage, StoreID), + DataSyncInterval = ar_sync_record:get_next_synced_interval( + Cursor, infinity, ar_data_sync, StoreID), + align_intervals(Cursor, ChunkStorageInterval, DataSyncInterval). + +align_intervals(_Cursor, not_found, not_found) -> + {not_found, not_found}; +align_intervals(Cursor, not_found, DataSyncInterval) -> + {not_found, clamp_interval(Cursor, infinity, DataSyncInterval)}; +align_intervals(Cursor, ChunkStorageInterval, not_found) -> + {clamp_interval(Cursor, infinity, ChunkStorageInterval), not_found}; +align_intervals(Cursor, ChunkStorageInterval, DataSyncInterval) -> + {ChunkStorageEnd, _} = ChunkStorageInterval, + {DataSyncEnd, _} = DataSyncInterval, + + { + clamp_interval(Cursor, DataSyncEnd, ChunkStorageInterval), + clamp_interval(Cursor, ChunkStorageEnd, DataSyncInterval) + }. + +union_intervals(not_found, not_found) -> + not_found; +union_intervals(not_found, B) -> + B; +union_intervals(A, not_found) -> + A; +union_intervals({End1, Start1}, {End2, Start2}) -> + {max(End1, End2), min(Start1, Start2)}. + +clamp_interval(ClampMin, ClampMax, {End, Start}) -> + check_interval({min(End, ClampMax), max(Start, ClampMin)}). + +check_interval({End, Start}) when Start > End -> + not_found; +check_interval(Interval) -> + Interval. + +%% ar_chunk_storage does not store small chunks before strict_split_data_threshold +%% (before 30607159107830 = partitions 0-7 and a half of 8 +%% + +%%%=================================================================== +%%% Tests. +%%%=================================================================== + +intervals_test_() -> + [ + {timeout, 30, fun test_align_intervals/0}, + {timeout, 30, fun test_union_intervals/0} + ]. + +verify_chunk_storage_test_() -> + [ + {timeout, 30, fun test_verify_chunk_storage_in_interval/0}, + {timeout, 30, fun test_verify_chunk_storage_should_store/0}, + {timeout, 30, fun test_verify_chunk_storage_should_not_store/0} + ]. + +verify_proof_test_() -> + [ + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> not_found end}], + fun test_verify_proof_no_datapath/0 + ), + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> {ok, <<>>} end}, + {ar_poa, validate_paths, fun(_, _, _, _) -> {true, <<>>} end} + ], + fun test_verify_proof_valid_paths/0 + ), + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> {ok, <<>>} end}, + {ar_poa, validate_paths, fun(_, _, _, _) -> {false, <<>>} end} + ], + fun test_verify_proof_invalid_paths/0 + ) + ]. + +verify_chunk_test_() -> + [ + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> {ok, <<>>} end}, + {ar_poa, validate_paths, fun(_, _, _, _) -> {true, <<>>} end} + ], + fun test_verify_chunk/0 + ) + + ]. +test_align_intervals() -> + ?assertEqual( + {not_found, not_found}, + align_intervals(0, not_found, not_found)), + ?assertEqual( + {{10, 5}, not_found}, + align_intervals(0, {10, 5}, not_found)), + ?assertEqual( + {{10, 7}, not_found}, + align_intervals(7, {10, 5}, not_found)), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, {10, 5}, not_found)), + ?assertEqual( + {not_found, {10, 5}}, + align_intervals(0, not_found, {10, 5})), + ?assertEqual( + {not_found, {10, 7}}, + align_intervals(7, not_found, {10, 5})), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, not_found, {10, 5})), + + ?assertEqual( + {{9, 4}, {9, 5}}, + align_intervals(0, {9, 4}, {10, 5})), + ?assertEqual( + {{9, 7}, {9, 7}}, + align_intervals(7, {9, 4}, {10, 5})), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, {9, 4}, {10, 5})), + ?assertEqual( + {{9, 5}, {9, 4}}, + align_intervals(0, {10, 5}, {9, 4})), + ?assertEqual( + {{9, 7}, {9, 7}}, + align_intervals(7, {10, 5}, {9, 4})), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, {10, 5}, {9, 4})), + ok. + +test_union_intervals() -> + ?assertEqual( + not_found, + union_intervals(not_found, not_found)), + ?assertEqual( + {10, 5}, + union_intervals(not_found, {10, 5})), + ?assertEqual( + {10, 5}, + union_intervals({10, 5}, not_found)), + ?assertEqual( + {10, 3}, + union_intervals({10, 7}, {5, 3})), + ok. + + +test_verify_chunk_storage_in_interval() -> + ?assertEqual( + #state{}, + verify_chunk_storage( + 10*?DATA_CHUNK_SIZE, + ?DATA_CHUNK_SIZE, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{})), + ?assertEqual( + #state{}, + verify_chunk_storage( + 5*?DATA_CHUNK_SIZE, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{})), + ?assertEqual( + #state{}, + verify_chunk_storage( + 20*?DATA_CHUNK_SIZE, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{})), + ok. + +test_verify_chunk_storage_should_store() -> + Addr = crypto:strong_rand_bytes(32), + ExpectedState = #state{ + packing = unpacked, + report = #report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE}, + error_chunks = #{chunk_storage_gap => 1} + } + }, + ?assertEqual( + ExpectedState, + verify_chunk_storage( + 0, + ?DATA_CHUNK_SIZE, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState, + verify_chunk_storage( + ?STRICT_DATA_SPLIT_THRESHOLD + 1, + ?DATA_CHUNK_SIZE, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + #state{ + packing = {composite, Addr, 1}, + report = #report{ + total_error_bytes = ?DATA_CHUNK_SIZE div 2, + total_error_chunks = 1, + error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE div 2}, + error_chunks = #{chunk_storage_gap => 1} + } + }, + verify_chunk_storage( + ?STRICT_DATA_SPLIT_THRESHOLD + 1, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = {composite, Addr, 1} })), + ok. + +test_verify_chunk_storage_should_not_store() -> + ExpectedState = #state{ + packing = unpacked + }, + ?assertEqual( + ExpectedState, + verify_chunk_storage( + 0, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState, + verify_chunk_storage( + ?STRICT_DATA_SPLIT_THRESHOLD + 1, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ok. + +test_verify_proof_no_datapath() -> + ExpectedState1 = #state{ + packing = unpacked, + report = #report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE}, + error_chunks = #{read_data_path_error => 1} + } + }, + ExpectedState2 = #state{ + packing = unpacked, + report = #report{ + total_error_bytes = ?DATA_CHUNK_SIZE div 2, + total_error_chunks = 1, + error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE div 2}, + error_chunks = #{read_data_path_error => 1} + } + }, + ?assertEqual( + ExpectedState1, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState2, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}, + #state{ packing = unpacked })), + ok. + +test_verify_proof_valid_paths() -> + ?assertEqual( + #state{}, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE}, + #state{})), + ok. + +test_verify_proof_invalid_paths() -> + ExpectedState1 = #state{ + packing = unpacked, + report = #report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE}, + error_chunks = #{validate_paths_error => 1} + } + }, + ExpectedState2 = #state{ + packing = unpacked, + report = #report{ + total_error_bytes = ?DATA_CHUNK_SIZE div 2, + total_error_chunks = 1, + error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE div 2}, + error_chunks = #{validate_paths_error => 1} + } + }, + ?assertEqual( + ExpectedState1, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState2, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}, + #state{ packing = unpacked })), + ok. + +test_verify_chunk() -> + PreSplitOffset = ?STRICT_DATA_SPLIT_THRESHOLD - (?DATA_CHUNK_SIZE div 2), + PostSplitOffset = ?STRICT_DATA_SPLIT_THRESHOLD + (?DATA_CHUNK_SIZE div 2), + IntervalStart = ?STRICT_DATA_SPLIT_THRESHOLD - ?DATA_CHUNK_SIZE, + IntervalEnd = ?STRICT_DATA_SPLIT_THRESHOLD + ?DATA_CHUNK_SIZE, + Interval = {IntervalEnd, IntervalStart}, + ?assertEqual( + #state{cursor = PreSplitOffset}, + verify_chunk( + {ok, <<>>, {PreSplitOffset, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}}, + {Interval, not_found}, + #state{})), + ?assertEqual( + #state{cursor = ?STRICT_DATA_SPLIT_THRESHOLD + ?DATA_CHUNK_SIZE}, + verify_chunk( + {ok, <<>>, {PostSplitOffset, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}}, + {Interval, not_found}, + #state{})), + ExpectedState = #state{ + packing = unpacked, + report = #report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{get_chunk_error => ?DATA_CHUNK_SIZE}, + error_chunks = #{get_chunk_error => 1} + } + }, + ?assertEqual( + ExpectedState, + verify_chunk( + {error, some_error}, + {Interval, not_found}, + #state{ packing = unpacked })), + ok. diff --git a/apps/arweave/src/ar_verify_chunk_storage_sup.erl b/apps/arweave/src/ar_verify_chunk_storage_sup.erl new file mode 100644 index 000000000..cd073df12 --- /dev/null +++ b/apps/arweave/src/ar_verify_chunk_storage_sup.erl @@ -0,0 +1,40 @@ +-module(ar_verify_chunk_storage_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-include_lib("arweave/include/ar_sup.hrl"). +-include_lib("arweave/include/ar_config.hrl"). + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks. +%% =================================================================== + +init([]) -> + {ok, Config} = application:get_env(arweave, config), + case Config#config.verify of + false -> + ignore; + true -> + Workers = lists:map( + fun(StorageModule) -> + StoreID = ar_storage_module:id(StorageModule), + Label = ar_storage_module:label(StorageModule), + Name = list_to_atom("ar_verify_chunk_storage_" ++ Label), + ?CHILD_WITH_ARGS(ar_verify_chunk_storage, worker, Name, [Name, StoreID]) + end, + Config#config.storage_modules + ), + {ok, {{one_for_one, 5, 10}, Workers}} + end. + From 8b0bd467a385f7df0d737fe853eb4888ab854b03 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Wed, 6 Nov 2024 20:31:00 -0500 Subject: [PATCH 2/8] chore: rename ar_verify_chunk_storage -> ar_verify_chunks --- .github/workflows/test.yml | 1 + apps/arweave/src/ar_sup.erl | 2 +- .../{ar_verify_chunk_storage.erl => ar_verify_chunks.erl} | 2 +- ...erify_chunk_storage_sup.erl => ar_verify_chunks_sup.erl} | 6 +++--- 4 files changed, 6 insertions(+), 5 deletions(-) rename apps/arweave/src/{ar_verify_chunk_storage.erl => ar_verify_chunks.erl} (99%) rename apps/arweave/src/{ar_verify_chunk_storage_sup.erl => ar_verify_chunks_sup.erl} (84%) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 211abc1ef..122ede81f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -76,6 +76,7 @@ jobs: ar_tx_db, ar_unbalanced_merkle, ar_util, + ar_verify_chunks, ar_wallet, ar_webhook, ar_pool, diff --git a/apps/arweave/src/ar_sup.erl b/apps/arweave/src/ar_sup.erl index e601f5030..a6608e82f 100644 --- a/apps/arweave/src/ar_sup.erl +++ b/apps/arweave/src/ar_sup.erl @@ -79,7 +79,7 @@ init([]) -> ?CHILD(ar_header_sync, worker), ?CHILD_SUP(ar_data_sync_sup, supervisor), ?CHILD_SUP(ar_chunk_storage_sup, supervisor), - ?CHILD_SUP(ar_verify_chunk_storage_sup, supervisor), + ?CHILD_SUP(ar_verify_chunks_sup, supervisor), ?CHILD(ar_global_sync_record, worker), ?CHILD_SUP(ar_nonce_limiter_server_sup, supervisor), ?CHILD(ar_nonce_limiter, worker), diff --git a/apps/arweave/src/ar_verify_chunk_storage.erl b/apps/arweave/src/ar_verify_chunks.erl similarity index 99% rename from apps/arweave/src/ar_verify_chunk_storage.erl rename to apps/arweave/src/ar_verify_chunks.erl index 91f9bfe5f..d4b702907 100644 --- a/apps/arweave/src/ar_verify_chunk_storage.erl +++ b/apps/arweave/src/ar_verify_chunks.erl @@ -1,5 +1,5 @@ %%% The blob storage optimized for fast reads. --module(ar_verify_chunk_storage). +-module(ar_verify_chunks). -behaviour(gen_server). diff --git a/apps/arweave/src/ar_verify_chunk_storage_sup.erl b/apps/arweave/src/ar_verify_chunks_sup.erl similarity index 84% rename from apps/arweave/src/ar_verify_chunk_storage_sup.erl rename to apps/arweave/src/ar_verify_chunks_sup.erl index cd073df12..b0dff2bcf 100644 --- a/apps/arweave/src/ar_verify_chunk_storage_sup.erl +++ b/apps/arweave/src/ar_verify_chunks_sup.erl @@ -1,4 +1,4 @@ --module(ar_verify_chunk_storage_sup). +-module(ar_verify_chunks_sup). -behaviour(supervisor). @@ -30,8 +30,8 @@ init([]) -> fun(StorageModule) -> StoreID = ar_storage_module:id(StorageModule), Label = ar_storage_module:label(StorageModule), - Name = list_to_atom("ar_verify_chunk_storage_" ++ Label), - ?CHILD_WITH_ARGS(ar_verify_chunk_storage, worker, Name, [Name, StoreID]) + Name = list_to_atom("ar_verify_chunks_" ++ Label), + ?CHILD_WITH_ARGS(ar_verify_chunks, worker, Name, [Name, StoreID]) end, Config#config.storage_modules ), From f5a20c3fb4edebc73b22f2e1637f549be6e9a6ae Mon Sep 17 00:00:00 2001 From: James Piechota Date: Thu, 7 Nov 2024 10:32:15 -0500 Subject: [PATCH 3/8] fix: finish implementing verification report --- apps/arweave/include/ar_verify_chunks.hrl | 14 ++ apps/arweave/src/ar_verify_chunks.erl | 124 +++++++----------- .../arweave/src/ar_verify_chunks_reporter.erl | 103 +++++++++++++++ apps/arweave/src/ar_verify_chunks_sup.erl | 6 +- 4 files changed, 171 insertions(+), 76 deletions(-) create mode 100644 apps/arweave/include/ar_verify_chunks.hrl create mode 100644 apps/arweave/src/ar_verify_chunks_reporter.erl diff --git a/apps/arweave/include/ar_verify_chunks.hrl b/apps/arweave/include/ar_verify_chunks.hrl new file mode 100644 index 000000000..9ab4fe12e --- /dev/null +++ b/apps/arweave/include/ar_verify_chunks.hrl @@ -0,0 +1,14 @@ +-ifndef(AR_VERIFY_CHUNKS_HRL). +-define(AR_VERIFY_CHUNKS_HRL, true). + +-record(verify_report, { + start_time :: non_neg_integer(), + total_error_bytes = 0 :: non_neg_integer(), + total_error_chunks = 0 :: non_neg_integer(), + error_bytes = #{} :: #{atom() => non_neg_integer()}, + error_chunks = #{} :: #{atom() => non_neg_integer()}, + bytes_processed = 0 :: non_neg_integer(), + progress = 0 :: non_neg_integer() +}). + +-endif. diff --git a/apps/arweave/src/ar_verify_chunks.erl b/apps/arweave/src/ar_verify_chunks.erl index d4b702907..208d8831f 100644 --- a/apps/arweave/src/ar_verify_chunks.erl +++ b/apps/arweave/src/ar_verify_chunks.erl @@ -1,35 +1,25 @@ -%%% The blob storage optimized for fast reads. -module(ar_verify_chunks). -behaviour(gen_server). - --export([start_link/2, init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). +-export([start_link/2, name/1]). +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). +-include_lib("arweave/include/ar_verify_chunks.hrl"). -include_lib("eunit/include/eunit.hrl"). --record(report, { - total_error_bytes = 0 :: non_neg_integer(), - total_error_chunks = 0 :: non_neg_integer(), - error_bytes = #{} :: #{atom() => non_neg_integer()}, - error_chunks = #{} :: #{atom() => non_neg_integer()} -}). - -record(state, { store_id :: binary(), packing :: binary(), - start_time :: non_neg_integer(), start_offset :: non_neg_integer(), end_offset :: non_neg_integer(), cursor :: non_neg_integer(), ready = false :: boolean(), - report = #report{} :: #report{} + verify_report = #verify_report{} :: #verify_report{} }). --define(REPORT_PROGRESS_INTERVAL, 5000). - %%%=================================================================== %%% Public interface. %%%=================================================================== @@ -38,67 +28,41 @@ start_link(Name, StorageModule) -> gen_server:start_link({local, Name}, ?MODULE, StorageModule, []). +-spec name(binary()) -> atom(). +name(StoreID) -> + list_to_atom("ar_verify_chunks_" ++ ar_storage_module:label_by_id(StoreID)). + %%%=================================================================== %%% Generic server callbacks. %%%=================================================================== init(StoreID) -> - ?LOG_ERROR([{event, verify_chunk_storage_started}, {store_id, StoreID}]), + ?LOG_INFO([{event, verify_chunk_storage_started}, {store_id, StoreID}]), {StartOffset, EndOffset} = ar_storage_module:get_range(StoreID), gen_server:cast(self(), verify), - ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), {ok, #state{ store_id = StoreID, packing = ar_storage_module:get_packing(StoreID), - start_time = erlang:system_time(millisecond), start_offset = StartOffset, end_offset = EndOffset, cursor = StartOffset, - ready = is_ready(EndOffset) + ready = is_ready(EndOffset), + verify_report = #verify_report{ + start_time = erlang:system_time(millisecond) + } }}. handle_cast(verify, #state{ready = false, end_offset = EndOffset} = State) -> - ?LOG_ERROR([{event, not_ready}]), ar_util:cast_after(1000, self(), verify), {noreply, State#state{ready = is_ready(EndOffset)}}; handle_cast(verify, #state{cursor = Cursor, end_offset = EndOffset} = State) when Cursor >= EndOffset -> ar:console("Done!~n"), {noreply, State}; -handle_cast(verify, #state{store_id = StoreID} = State) -> - {noreply, verify(State)}; - -handle_cast(report_progress, #state{ready = false} = State) -> - ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), - {noreply, State}; -handle_cast(report_progress, #state{cursor = Cursor, end_offset = EndOffset} = State) - when Cursor >= EndOffset -> - {noreply, State}; -handle_cast(report_progress, State) -> - #state{ - start_time = StartTime, - cursor = Cursor, - start_offset = StartOffset, - end_offset = EndOffset, - report = Report, - store_id = StoreID - } = State, - Bytes = Cursor - StartOffset, - MB = Bytes div 1000000, - Progress = Bytes * 100 div (EndOffset - StartOffset), - ElapsedTime = (erlang:system_time(millisecond) - StartTime) div 1000, - Rate = MB div ElapsedTime, - Intersection = ar_sync_record:get_intersection_size(EndOffset, StartOffset, invalid_chunks, StoreID), - ar:console("============== ~s ==============~n", [StoreID]), - ar:console("Verified ~B GB. ~B% done. ~B elapsed. ~B MB/s.~n", [MB div 1000, Progress, ElapsedTime, Rate]), - ar:console("Missing chunks: ~B~n", [Report#report.total_error_chunks]), - ar:console("Missing bytes: ~B MB~n", [Report#report.total_error_bytes div 1000000]), - ar:console("Missing bytes (padded): ~B MB~n", [Report#report.total_error_chunks * ?DATA_CHUNK_SIZE div 1000000]), - ar:console("Report: ~p~n", [Report]), - ar:console("Intervals: ~p~n", [Intersection]), - ar:console("=============================================================================================~n~n"), - ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), - {noreply, State}; +handle_cast(verify, State) -> + State2 = verify(State), + State3 = report_progress(State2), + {noreply, State3}; handle_cast(Cast, State) -> ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), @@ -106,7 +70,7 @@ handle_cast(Cast, State) -> handle_call(Call, From, State) -> ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}, {from, From}]), - {noreply, State}. + {reply, ok, State}. handle_info(Info, State) -> ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]), @@ -201,23 +165,23 @@ invalidate_chunk(Type, Offset, ChunkSize, Logs, State) -> log_error(Type, Offset, ChunkSize, Logs, State). log_error(Type, Offset, ChunkSize, Logs, State) -> - #state{ report = Report, store_id = StoreID, cursor = Cursor, packing = Packing } = State, + #state{ verify_report = Report, store_id = StoreID, cursor = Cursor, packing = Packing } = State, LogMessage = [{event, verify_chunk_storage_error}, {type, Type}, {store_id, StoreID}, {packing, ar_serialize:encode_packing(Packing, true)}, {offset, Offset}, {cursor, Cursor}, {chunk_size, ChunkSize}] ++ Logs, ?LOG_INFO(LogMessage), - NewBytes = maps:get(Type, Report#report.error_bytes, 0) + ChunkSize, - NewChunks = maps:get(Type, Report#report.error_chunks, 0) + 1, - - Report2 = Report#report{ - total_error_bytes = Report#report.total_error_bytes + ChunkSize, - total_error_chunks = Report#report.total_error_chunks + 1, - error_bytes = maps:put(Type, NewBytes, Report#report.error_bytes), - error_chunks = maps:put(Type, NewChunks, Report#report.error_chunks) + NewBytes = maps:get(Type, Report#verify_report.error_bytes, 0) + ChunkSize, + NewChunks = maps:get(Type, Report#verify_report.error_chunks, 0) + 1, + + Report2 = Report#verify_report{ + total_error_bytes = Report#verify_report.total_error_bytes + ChunkSize, + total_error_chunks = Report#verify_report.total_error_chunks + 1, + error_bytes = maps:put(Type, NewBytes, Report#verify_report.error_bytes), + error_chunks = maps:put(Type, NewChunks, Report#verify_report.error_chunks) }, - State#state{ report = Report2 }. + State#state{ verify_report = Report2 }. query_intervals(State) -> #state{cursor = Cursor, store_id = StoreID} = State, @@ -264,6 +228,20 @@ check_interval({End, Start}) when Start > End -> check_interval(Interval) -> Interval. +report_progress(State) -> + #state{ + store_id = StoreID, verify_report = Report, cursor = Cursor, + start_offset = StartOffset, end_offset = EndOffset + } = State, + BytesProcessed = Cursor - StartOffset, + Progress = BytesProcessed * 100 div (EndOffset - StartOffset), + Report2 = Report#verify_report{ + bytes_processed = BytesProcessed, + progress = Progress + }, + ar_verify_chunks_reporter:update(StoreID, Report2), + State#state{ verify_report = Report2 }. + %% ar_chunk_storage does not store small chunks before strict_split_data_threshold %% (before 30607159107830 = partitions 0-7 and a half of 8 %% @@ -313,8 +291,8 @@ verify_chunk_test_() -> ], fun test_verify_chunk/0 ) - - ]. + ]. + test_align_intervals() -> ?assertEqual( {not_found, not_found}, @@ -402,7 +380,7 @@ test_verify_chunk_storage_should_store() -> Addr = crypto:strong_rand_bytes(32), ExpectedState = #state{ packing = unpacked, - report = #report{ + verify_report = #verify_report{ total_error_bytes = ?DATA_CHUNK_SIZE, total_error_chunks = 1, error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE}, @@ -426,7 +404,7 @@ test_verify_chunk_storage_should_store() -> ?assertEqual( #state{ packing = {composite, Addr, 1}, - report = #report{ + verify_report = #verify_report{ total_error_bytes = ?DATA_CHUNK_SIZE div 2, total_error_chunks = 1, error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE div 2}, @@ -463,7 +441,7 @@ test_verify_chunk_storage_should_not_store() -> test_verify_proof_no_datapath() -> ExpectedState1 = #state{ packing = unpacked, - report = #report{ + verify_report = #verify_report{ total_error_bytes = ?DATA_CHUNK_SIZE, total_error_chunks = 1, error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE}, @@ -472,7 +450,7 @@ test_verify_proof_no_datapath() -> }, ExpectedState2 = #state{ packing = unpacked, - report = #report{ + verify_report = #verify_report{ total_error_bytes = ?DATA_CHUNK_SIZE div 2, total_error_chunks = 1, error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE div 2}, @@ -502,7 +480,7 @@ test_verify_proof_valid_paths() -> test_verify_proof_invalid_paths() -> ExpectedState1 = #state{ packing = unpacked, - report = #report{ + verify_report = #verify_report{ total_error_bytes = ?DATA_CHUNK_SIZE, total_error_chunks = 1, error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE}, @@ -511,7 +489,7 @@ test_verify_proof_invalid_paths() -> }, ExpectedState2 = #state{ packing = unpacked, - report = #report{ + verify_report = #verify_report{ total_error_bytes = ?DATA_CHUNK_SIZE div 2, total_error_chunks = 1, error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE div 2}, @@ -550,7 +528,7 @@ test_verify_chunk() -> #state{})), ExpectedState = #state{ packing = unpacked, - report = #report{ + verify_report = #verify_report{ total_error_bytes = ?DATA_CHUNK_SIZE, total_error_chunks = 1, error_bytes = #{get_chunk_error => ?DATA_CHUNK_SIZE}, diff --git a/apps/arweave/src/ar_verify_chunks_reporter.erl b/apps/arweave/src/ar_verify_chunks_reporter.erl new file mode 100644 index 000000000..1c03d03f0 --- /dev/null +++ b/apps/arweave/src/ar_verify_chunks_reporter.erl @@ -0,0 +1,103 @@ +%%% The blob storage optimized for fast reads. +-module(ar_verify_chunks_reporter). + +-behaviour(gen_server). + +-export([start_link/0, update/2]). +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). + +-include_lib("arweave/include/ar.hrl"). +-include_lib("arweave/include/ar_verify_chunks.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(state, { + reports = #{} :: #{binary() => #verify_report{}} +}). + +-define(REPORT_PROGRESS_INTERVAL, 10000). + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +%% @doc Start the server. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec update(binary(), #verify_report{}) -> ok. +update(StoreID, Report) -> + gen_server:cast(?MODULE, {update, StoreID, Report}). + +%%%=================================================================== +%%% Generic server callbacks. +%%%=================================================================== + +init([]) -> + ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), + {ok, #state{}}. + + +handle_cast({update, StoreID, Report}, State) -> + {noreply, State#state{ reports = maps:put(StoreID, Report, State#state.reports) }}; + +handle_cast(report_progress, State) -> + #state{ + reports = Reports + } = State, + + print_reports(Reports), + ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), + {noreply, State}; + +handle_cast(Cast, State) -> + ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), + {noreply, State}. + +handle_call(Call, From, State) -> + ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}, {from, From}]), + {reply, ok, State}. + +handle_info(Info, State) -> + ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +print_reports(Reports) when map_size(Reports) == 0 -> + ok; +print_reports(Reports) -> + print_header(), + maps:foreach( + fun(StoreID, Report) -> + print_report(StoreID, Report) + end, + Reports + ), + print_footer(), + ok. + +print_header() -> + ar:console("Verification Report~n", []), + ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n", []), + ar:console("| Storage Module | Processed | % | Errors | Verify Rate |~n", []), + ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n", []). + +print_footer() -> + ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n~n", []). + +print_report(StoreID, Report) -> + #verify_report{ + total_error_bytes = TotalErrorBytes, + bytes_processed = BytesProcessed, + progress = Progress, + start_time = StartTime + } = Report, + Duration = erlang:system_time(millisecond) - StartTime, + Rate = 1000 * BytesProcessed / Duration, + ar:console("| ~65s | ~4B GB | ~3B% | ~5.1f GB | ~6.1f MB/s |~n", + [ + StoreID, BytesProcessed div 1000000000, Progress, + TotalErrorBytes / 1000000000, Rate / 1000000 + ] + ). \ No newline at end of file diff --git a/apps/arweave/src/ar_verify_chunks_sup.erl b/apps/arweave/src/ar_verify_chunks_sup.erl index b0dff2bcf..9d81d65c9 100644 --- a/apps/arweave/src/ar_verify_chunks_sup.erl +++ b/apps/arweave/src/ar_verify_chunks_sup.erl @@ -29,12 +29,12 @@ init([]) -> Workers = lists:map( fun(StorageModule) -> StoreID = ar_storage_module:id(StorageModule), - Label = ar_storage_module:label(StorageModule), - Name = list_to_atom("ar_verify_chunks_" ++ Label), + Name = ar_verify_chunks:name(StoreID), ?CHILD_WITH_ARGS(ar_verify_chunks, worker, Name, [Name, StoreID]) end, Config#config.storage_modules ), - {ok, {{one_for_one, 5, 10}, Workers}} + Reporter = ?CHILD(ar_verify_chunks_reporter, worker), + {ok, {{one_for_one, 5, 10}, [Reporter | Workers]}} end. From fff3223a1f95c2bf6da3d8c2e033ebb66c1fc1c2 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Thu, 7 Nov 2024 14:32:32 -0500 Subject: [PATCH 4/8] fix: disable a bunch of features when `verify` is on --- apps/arweave/src/ar.erl | 7 +-- apps/arweave/src/ar_config.erl | 44 +++++++++++++++---- apps/arweave/src/ar_node_worker.erl | 2 +- apps/arweave/src/ar_nonce_limiter.erl | 33 ++++++-------- .../src/ar_nonce_limiter_server_sup.erl | 2 +- 5 files changed, 56 insertions(+), 32 deletions(-) diff --git a/apps/arweave/src/ar.erl b/apps/arweave/src/ar.erl index 473cadf59..360dea491 100644 --- a/apps/arweave/src/ar.erl +++ b/apps/arweave/src/ar.erl @@ -678,10 +678,11 @@ start(Config) -> timer:sleep(2000), erlang:halt() end, - ok = application:set_env(arweave, config, Config), - filelib:ensure_dir(Config#config.log_dir ++ "/"), + Config2 = ar_config:set_dependent_flags(Config), + ok = application:set_env(arweave, config, Config2), + filelib:ensure_dir(Config2#config.log_dir ++ "/"), warn_if_single_scheduler(), - case Config#config.nonce_limiter_server_trusted_peers of + case Config2#config.nonce_limiter_server_trusted_peers of [] -> VDFSpeed = ar_bench_vdf:run_benchmark(), ?LOG_INFO([{event, vdf_benchmark}, {vdf_s, VDFSpeed / 1000000}]); diff --git a/apps/arweave/src/ar_config.erl b/apps/arweave/src/ar_config.erl index ad2479f90..116ef35a2 100644 --- a/apps/arweave/src/ar_config.erl +++ b/apps/arweave/src/ar_config.erl @@ -1,6 +1,6 @@ -module(ar_config). --export([validate_config/1, auto_join/0, verify/0, use_remote_vdf_server/0, +-export([validate_config/1, set_dependent_flags/1, use_remote_vdf_server/0, pull_from_remote_vdf_server/0, compute_own_vdf/0, is_vdf_server/0, is_public_vdf_server/0, parse/1, parse_storage_module/1, log_config/1]). @@ -13,6 +13,7 @@ %%% Public interface. %%%=================================================================== +-spec validate_config(Config :: #config{}) -> boolean(). validate_config(Config) -> validate_init(Config) andalso validate_storage_modules(Config) andalso @@ -21,13 +22,11 @@ validate_config(Config) -> validate_packing_difficulty(Config) andalso validate_verify(Config). -auto_join() -> - {ok, Config} = application:get_env(arweave, config), - Config#config.auto_join andalso not Config#config.verify. - -verify() -> - {ok, Config} = application:get_env(arweave, config), - Config#config.verify. +-spec set_dependent_flags(Config :: #config{}) -> #config{}. +%% @doc Some flags force other flags to be set. +set_dependent_flags(Config) -> + Config2 = set_verify_flags(Config), + Config2. use_remote_vdf_server() -> {ok, Config} = application:get_env(arweave, config), @@ -990,3 +989,32 @@ validate_verify(#config{ verify = true, false; validate_verify(_Config) -> true. + +disable_vdf(Config) -> + RemovePublicVDFServer = + lists:filter(fun(Item) -> Item =/= public_vdf_server end, Config#config.enable), + Config#config{ + nonce_limiter_client_peers = [], + nonce_limiter_server_trusted_peers = [], + enable = RemovePublicVDFServer, + disable = [compute_own_vdf | Config#config.disable] + }. + +set_verify_flags(#config{ verify = false } = Config) -> + Config; +set_verify_flags(Config) -> + io:format("~n~nWARNING: The verify flag is set. Forcing the following options:"), + io:format("~n - auto_join = false"), + io:format("~n - start_from_latest_state = true"), + io:format("~n - sync_jobs = 0"), + io:format("~n - block_pollers = 0"), + io:format("~n - header_sync_jobs = 0"), + io:format("~n - all VDF features disabled"), + Config2 = disable_vdf(Config), + Config2#config{ + auto_join = false, + start_from_latest_state = true, + sync_jobs = 0, + block_pollers = 0, + header_sync_jobs = 0 + }. diff --git a/apps/arweave/src/ar_node_worker.erl b/apps/arweave/src/ar_node_worker.erl index 6ba61fba7..ebedf43c0 100644 --- a/apps/arweave/src/ar_node_worker.erl +++ b/apps/arweave/src/ar_node_worker.erl @@ -93,7 +93,7 @@ init([]) -> validate_trusted_peers(Config), StartFromLocalState = Config#config.start_from_latest_state orelse Config#config.start_from_block /= undefined, - case {StartFromLocalState, Config#config.init, ar_config:auto_join()} of + case {StartFromLocalState, Config#config.init, Config#config.auto_join} of {false, false, true} -> ar_join:start(ar_peers:get_trusted_peers()); {true, _, _} -> diff --git a/apps/arweave/src/ar_nonce_limiter.erl b/apps/arweave/src/ar_nonce_limiter.erl index 90d9441e5..afbcd2bf2 100644 --- a/apps/arweave/src/ar_nonce_limiter.erl +++ b/apps/arweave/src/ar_nonce_limiter.erl @@ -408,27 +408,22 @@ apply_external_update(Update, Peer) -> %%%=================================================================== init([]) -> - case ar_config:verify() of + ok = ar_events:subscribe(node_state), + State = + case ar_node:is_joined() of + true -> + Blocks = get_blocks(), + handle_initialized(Blocks, #state{}); + _ -> + #state{} + end, + case ar_config:use_remote_vdf_server() and not ar_config:compute_own_vdf() of true -> - ignore; + gen_server:cast(?MODULE, check_external_vdf_server_input); false -> - ok = ar_events:subscribe(node_state), - State = - case ar_node:is_joined() of - true -> - Blocks = get_blocks(), - handle_initialized(Blocks, #state{}); - _ -> - #state{} - end, - case ar_config:use_remote_vdf_server() and not ar_config:compute_own_vdf() of - true -> - gen_server:cast(?MODULE, check_external_vdf_server_input); - false -> - ok - end, - {ok, start_worker(State#state{ autocompute = ar_config:compute_own_vdf() })} - end. + ok + end, + {ok, start_worker(State#state{ autocompute = ar_config:compute_own_vdf() })}. get_blocks() -> B = ar_node:get_current_block(), diff --git a/apps/arweave/src/ar_nonce_limiter_server_sup.erl b/apps/arweave/src/ar_nonce_limiter_server_sup.erl index eab8540a4..fb3898e2a 100644 --- a/apps/arweave/src/ar_nonce_limiter_server_sup.erl +++ b/apps/arweave/src/ar_nonce_limiter_server_sup.erl @@ -21,7 +21,7 @@ start_link() -> %% =================================================================== init([]) -> - case ar_config:is_vdf_server() andalso not ar_config:verify() of + case ar_config:is_vdf_server() of false -> ignore; true -> From f796b4815b18b9670860fdd00d1797c75a6bc57b Mon Sep 17 00:00:00 2001 From: James Piechota Date: Thu, 7 Nov 2024 15:17:49 -0500 Subject: [PATCH 5/8] fixup! feat: Introduce tool to verify a storage module --- apps/arweave/src/ar_data_sync.erl | 2 +- apps/arweave/src/ar_verify_chunks.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index bdb11616b..9d3a0e5c4 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -730,7 +730,7 @@ handle_cast({join, RecentBI}, State) -> PreviousWeaveSize = element(2, hd(CurrentBI)), {ok, OrphanedDataRoots} = remove_orphaned_data(State, Offset, PreviousWeaveSize), {ok, Config} = application:get_env(arweave, config), - [gen_server:cast(name(Module), + [gen_server:cast(name(ar_storage_module:id(Module)), {cut, Offset}) || Module <- Config#config.storage_modules], ok = ar_chunk_storage:cut(Offset, StoreID), ok = ar_sync_record:cut(Offset, ?MODULE, StoreID), diff --git a/apps/arweave/src/ar_verify_chunks.erl b/apps/arweave/src/ar_verify_chunks.erl index 208d8831f..1be67d4c9 100644 --- a/apps/arweave/src/ar_verify_chunks.erl +++ b/apps/arweave/src/ar_verify_chunks.erl @@ -145,7 +145,7 @@ verify_proof(MetaData, State) -> end. verify_chunk_storage(Offset, _ChunkSize, {End, Start}, State) - when Offset >= Start andalso Offset =< End -> + when Offset - ?DATA_CHUNK_SIZE >= Start andalso Offset =< End -> State; verify_chunk_storage(Offset, ChunkSize, _Interval, State) -> #state{ packing = Packing } = State, From 6578c188bd8565d293d7a3ca7a9b639e1a4df01f Mon Sep 17 00:00:00 2001 From: Lev Berman Date: Wed, 6 Nov 2024 17:26:11 +0100 Subject: [PATCH 6/8] WIP Repack (in place) unpacked chunks stored in RocksDB --- apps/arweave/src/ar_chunk_storage.erl | 264 +++++++++++++++++--------- apps/arweave/src/ar_data_sync.erl | 19 +- apps/arweave/src/ar_verify_chunks.erl | 2 +- 3 files changed, 173 insertions(+), 112 deletions(-) diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index 45a26e3fc..e17b905a7 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -3,7 +3,7 @@ -behaviour(gen_server). --export([start_link/2, put/2, put/3, +-export([start_link/2, name/1, is_storage_supported/3, put/2, put/3, open_files/1, get/1, get/2, get/5, read_chunk2/5, get_range/2, get_range/3, close_file/2, close_files/1, cut/2, delete/1, delete/2, list_files/2, run_defragmentation/0]). @@ -36,6 +36,34 @@ start_link(Name, StoreID) -> gen_server:start_link({local, Name}, ?MODULE, StoreID, []). +%% @doc Return the name of the server serving the given StoreID. +name(StoreID) -> + list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)). + +%% @doc Return true if we can accept the chunk for storage. +%% 256 KiB chunks are stored in the blob storage optimized for read speed. +%% Unpacked chunks smaller than 256 KiB cannot be stored here currently, +%% because the module does not keep track of the chunk sizes - all chunks +%% are assumed to be 256 KiB. +-spec is_storage_supported( + Offset :: non_neg_integer(), + ChunkSize :: non_neg_integer(), + Packing :: term() +) -> true | false. + +is_storage_supported(Offset, ChunkSize, Packing) -> + case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of + true -> + %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in 256 KiB buckets + %% so technically can be stored in ar_chunk_storage. However, to avoid + %% managing padding in ar_chunk_storage for unpacked chunks smaller than 256 KiB + %% (we do not need fast random access to unpacked chunks after + %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. + Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); + false -> + ChunkSize == (?DATA_CHUNK_SIZE) + end. + %% @doc Store the chunk under the given end offset, %% bytes Offset - ?DATA_CHUNK_SIZE, Offset - ?DATA_CHUNK_SIZE + 1, .., Offset - 1. put(PaddedOffset, Chunk) -> @@ -397,7 +425,7 @@ get_chunk_group_size() -> Config#config.chunk_storage_file_size. read_repack_cursor(StoreID, TargetPacking) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), case file:read_file(Filepath) of {ok, Bin} -> case catch binary_to_term(Bin) of @@ -411,7 +439,7 @@ read_repack_cursor(StoreID, TargetPacking) -> end. remove_repack_cursor(StoreID) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), case file:delete(Filepath) of ok -> ok; @@ -424,7 +452,7 @@ remove_repack_cursor(StoreID) -> store_repack_cursor(0, _StoreID, _TargetPacking) -> ok; store_repack_cursor(Cursor, StoreID, TargetPacking) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), file:write_file(Filepath, term_to_binary({Cursor, TargetPacking})). get_filepath(Name, StoreID) -> @@ -758,23 +786,26 @@ chunk_offset_list_to_map(ChunkOffsets) -> chunk_offset_list_to_map(ChunkOffsets, infinity, 0, #{}). repack(Cursor, RightBound, Packing, StoreID) -> - case ar_sync_record:get_next_synced_interval(Cursor, RightBound, ?MODULE, StoreID) of + case ar_sync_record:get_next_synced_interval(Cursor, RightBound, + ar_data_sync, StoreID) of not_found -> ar:console("~n~nRepacking of ~s is complete! " "We suggest you stop the node, rename " - "the storage module folder to reflect the new packing, and start the " + "the storage module folder to reflect " + "the new packing, and start the " "node with the new storage module.~n", [StoreID]), ?LOG_INFO([{event, repacking_complete}, {storage_module, StoreID}, - {target_packing, ar_serialize:encode_packing(Packing, true)}]), + {target_packing, + ar_serialize:encode_packing(Packing, true)}]), Server = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), gen_server:cast(Server, repacking_complete), ok; {End, Start} -> Start2 = max(Cursor, Start), - case ar_sync_record:get_next_synced_interval(Start2, End, Packing, ar_data_sync, - StoreID) of + case ar_sync_record:get_next_synced_interval(Start2, End, + Packing, ar_data_sync, StoreID) of not_found -> repack(Start2, End, End, RightBound, Packing, StoreID); {End3, Start3} when Start3 > Start2 -> @@ -789,7 +820,10 @@ repack(Start, End, NextCursor, RightBound, Packing, StoreID) when Start >= End - repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> {ok, Config} = application:get_env(arweave, config), RepackIntervalSize = ?DATA_CHUNK_SIZE * Config#config.repack_batch_size, - Server = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), + Server = name(StoreID), + Start2 = Start + RepackIntervalSize, + RepackFurtherArgs = {repack, Start2, End, NextCursor, RightBound, + RequiredPacking}, CheckPackingBuffer = case ar_packing_server:is_buffer_full() of true -> @@ -804,104 +838,146 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> continue -> continue; ok -> - case catch get_range(Start, RepackIntervalSize, StoreID) of - [] -> - Start2 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, - RequiredPacking}), - continue; - {'EXIT', _Exc} -> - ?LOG_ERROR([{event, failed_to_read_chunk_range}, - {storage_module, StoreID}, - {start, Start}, - {size, RepackIntervalSize}, - {store_id, StoreID}]), - Start2 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, - RequiredPacking}), - continue; - Range -> - {ok, Range} - end + repack_read_chunk_range(Start, RepackIntervalSize, + StoreID, RepackFurtherArgs) end, ReadMetadataRange = case ReadRange of continue -> continue; {ok, Range2} -> - {Min, Max, Map} = chunk_offset_list_to_map(Range2), - case ar_data_sync:get_chunk_metadata_range(Min, min(Max, End), StoreID) of - {ok, MetadataMap} -> - {ok, Map, MetadataMap}; - {error, Error} -> - ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, - {storage_module, StoreID}, - {error, io_lib:format("~p", [Error])}, - {left, Min}, - {right, Max}]), - Start3 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start3, End, NextCursor, RightBound, - RequiredPacking}), - continue - end + repack_read_chunk_metadata_range(StoreID, Range2, End, + RepackFurtherArgs) end, case ReadMetadataRange of continue -> ok; {ok, Map2, MetadataMap2} -> - Start4 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start4, End, NextCursor, RightBound, - RequiredPacking}), - maps:fold( - fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok) - when ChunkSize /= ?DATA_CHUNK_SIZE, - AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> - ok; - (AbsoluteOffset, {_, TXRoot, _, _, _, ChunkSize}, ok) -> - PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), - case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync, StoreID) of - {true, RequiredPacking} -> - ?LOG_WARNING([{event, - repacking_process_chunk_already_repacked}, - {storage_module, StoreID}, - {packing, - ar_serialize:encode_packing(RequiredPacking,true)}, - {offset, AbsoluteOffset}]), - ok; - {true, Packing} -> - case maps:get(PaddedOffset, Map2, not_found) of + gen_server:cast(Server, {repack, Start2, End, NextCursor, + RightBound, RequiredPacking}), + Args = {StoreID, RequiredPacking, Map2, Server}, + repack_send_chunks_for_repacking(MetadataMap2, Args) + end. + +repack_read_chunk_range(Start, Size, StoreID, RepackFurtherArgs) -> + Server = name(StoreID), + case catch get_range(Start, Size, StoreID) of + [] -> + gen_server:cast(Server, RepackFurtherArgs), + continue; + {'EXIT', _Exc} -> + ?LOG_ERROR([{event, failed_to_read_chunk_range}, + {storage_module, StoreID}, + {start, Start}, + {size, Size}]), + gen_server:cast(Server, RepackFurtherArgs), + continue; + Range -> + {ok, Range} + end. + +repack_read_chunk_metadata_range(StoreID, Range, End, RepackFurtherArgs) -> + Server = name(StoreID), + {Min, Max, Map} = chunk_offset_list_to_map(Range), % TODO + case ar_data_sync:get_chunk_metadata_range(Min, min(Max, End), StoreID) of + {ok, MetadataMap} -> + {ok, Map, MetadataMap}; + {error, Error} -> + ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, + {storage_module, StoreID}, + {error, io_lib:format("~p", [Error])}, + {left, Min}, + {right, Max}]), + gen_server:cast(Server, RepackFurtherArgs), + continue + end. + +repack_send_chunks_for_repacking(MetadataMap, Args) -> + maps:fold(repack_send_chunks_for_repacking(Args), ok, MetadataMap). + +repack_send_chunks_for_repacking(Args) -> + fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok) + when ChunkSize /= ?DATA_CHUNK_SIZE, + AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> + ok; + (AbsoluteOffset, {ChunkDataKey, TXRoot, _, _, _, ChunkSize}, ok) -> + repack_send_chunk_for_repacking(AbsoluteOffset, + ChunkDataKey, TXRoot, ChunkSize, Args) + end. + +repack_send_chunk_for_repacking(AbsoluteOffset, ChunkDataKey, + TXRoot, ChunkSize, Args) -> + {StoreID, RequiredPacking, ChunkMap} = Args, + Server = name(StoreID), + PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync, StoreID) of + {true, RequiredPacking} -> + ?LOG_WARNING([{event, repacking_process_chunk_already_repacked}, + {storage_module, StoreID}, + {packing, + ar_serialize:encode_packing(RequiredPacking, true)}, + {offset, AbsoluteOffset}]), + ok; + {true, Packing} -> + Chunk = + case maps:get(PaddedOffset, ChunkMap, not_found) of + not_found -> + case is_storage_supported(PaddedOffset, ChunkSize, Packing) of + false -> + %% We do not store unpacked chunks in + %% ar_chunk_storage. % TODO + not_found; + true -> + case ar_kv:get({chunk_data_db, StoreID}, ChunkDataKey) of not_found -> ?LOG_WARNING([{event, - chunk_not_found_in_chunk_storage}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok; - Chunk -> - Ref = make_ref(), - gen_server:cast(Server, - {register_packing_ref, Ref, PaddedOffset}), - ar_util:cast_after(300000, Server, - {expire_repack_request, Ref}), - ar_packing_server:request_repack(Ref, whereis(Server), - {RequiredPacking, Packing, Chunk, - AbsoluteOffset, TXRoot, ChunkSize}), - ok - end; - true -> - ?LOG_WARNING([{event, no_packing_information_for_the_chunk}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok; - false -> - ?LOG_WARNING([{event, chunk_not_found_in_sync_record}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok - end + entry_not_found_in_chunk_data_db}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}, + {padded_offset, PaddedOffset}]), + not_found; + {ok, V} -> + case binary_to_term(V) of + {Chunk2, _DataPath} -> + Chunk2; + _ -> + ?LOG_WARNING([{event, + chunk_neither_in_chunk_data_db_nor_in_chunk_storage}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}, + {padded_offset, PaddedOffset}]), + not_found + end + end + end; + Chunk3 -> + Chunk3 end, - ok, - MetadataMap2 - ) + case Chunk of + not_found -> + ok; + _ -> + Ref = make_ref(), + gen_server:cast(Server, + {register_packing_ref, Ref, PaddedOffset}), + ar_util:cast_after(300000, Server, + {expire_repack_request, Ref}), + ar_packing_server:request_repack(Ref, whereis(Server), + {RequiredPacking, Packing, Chunk, + AbsoluteOffset, TXRoot, ChunkSize}) + end; + true -> + ?LOG_WARNING([{event, no_packing_information_for_the_chunk}, + {storage_module, StoreID}, + {offset, PaddedOffset}]), + ok; + false -> + ?LOG_WARNING([{event, chunk_not_found_in_sync_record}, + {storage_module, StoreID}, + {offset, PaddedOffset}]), + ok end. chunk_offset_list_to_map([], Min, Max, Map) -> diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 9d3a0e5c4..3b4843487 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -12,7 +12,7 @@ get_chunk_by_byte/2, get_chunk_seek_offset/1, read_chunk/4, read_data_path/2, increment_chunk_cache_size/0, decrement_chunk_cache_size/0, get_chunk_padded_offset/1, get_chunk_metadata_range/3, - get_merkle_rebase_threshold/0, should_store_in_chunk_storage/3]). + get_merkle_rebase_threshold/0]). -export([debug_get_disk_pool_chunks/0]). @@ -2736,7 +2736,7 @@ write_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> #sync_data_state{ chunk_data_db = ChunkDataDB, store_id = StoreID } = State, - ShouldStoreInChunkStorage = should_store_in_chunk_storage(Offset, ChunkSize, Packing), + ShouldStoreInChunkStorage = ar_chunk_storage:is_storage_supported(Offset, ChunkSize, Packing), Result = case ShouldStoreInChunkStorage of true -> @@ -2757,21 +2757,6 @@ write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Pa Result end. -%% @doc 256 KiB chunks are stored in the blob storage optimized for read speed. -%% Return true if we want to place the chunk there. -should_store_in_chunk_storage(Offset, ChunkSize, Packing) -> - case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of - true -> - %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in 256 KiB buckets - %% so technically can be stored in ar_chunk_storage. However, to avoid - %% managing padding in ar_chunk_storage for unpacked chunks smaller than 256 KiB - %% (we do not need fast random access to unpacked chunks after - %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. - Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); - false -> - ChunkSize == (?DATA_CHUNK_SIZE) - end. - update_chunks_index(Args, State) -> AbsoluteChunkOffset = element(1, Args), case ar_tx_blacklist:is_byte_blacklisted(AbsoluteChunkOffset) of diff --git a/apps/arweave/src/ar_verify_chunks.erl b/apps/arweave/src/ar_verify_chunks.erl index 1be67d4c9..66184f863 100644 --- a/apps/arweave/src/ar_verify_chunks.erl +++ b/apps/arweave/src/ar_verify_chunks.erl @@ -149,7 +149,7 @@ verify_chunk_storage(Offset, _ChunkSize, {End, Start}, State) State; verify_chunk_storage(Offset, ChunkSize, _Interval, State) -> #state{ packing = Packing } = State, - case ar_data_sync:should_store_in_chunk_storage(Offset, ChunkSize, Packing) of + case ar_chunk_storage:is_storage_supported(Offset, ChunkSize, Packing) of true -> invalidate_chunk(chunk_storage_gap, Offset, ChunkSize, State); false -> From 90dec57733c072faf320a22de5ca1f6d0a40964c Mon Sep 17 00:00:00 2001 From: Lev Berman Date: Thu, 7 Nov 2024 15:13:34 +0100 Subject: [PATCH 7/8] fixup --- apps/arweave/src/ar_chunk_storage.erl | 179 ++++++++++++++++---------- apps/arweave/src/ar_data_sync.erl | 10 ++ 2 files changed, 124 insertions(+), 65 deletions(-) diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index e17b905a7..a36bfcc84 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -1,4 +1,4 @@ -%%% The blob storage optimized for fast reads. +%% The blob storage optimized for fast reads. -module(ar_chunk_storage). -behaviour(gen_server). @@ -315,8 +315,8 @@ handle_cast({repack, Start, End, NextCursor, RightBound, Packing}, spawn(fun() -> repack(Start, End, NextCursor, RightBound, Packing, StoreID) end), {noreply, State}; -handle_cast({register_packing_ref, Ref, Offset}, #state{ packing_map = Map } = State) -> - {noreply, State#state{ packing_map = maps:put(Ref, Offset, Map) }}; +handle_cast({register_packing_ref, Ref, Args}, #state{ packing_map = Map } = State) -> + {noreply, State#state{ packing_map = maps:put(Ref, Args, Map) }}; handle_cast({expire_repack_request, Ref}, #state{ packing_map = Map } = State) -> {noreply, State#state{ packing_map = maps:remove(Ref, Map) }}; @@ -371,33 +371,69 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, case maps:get(Ref, Map, not_found) of not_found -> {noreply, State}; - Offset -> + Args -> State2 = State#state{ packing_map = maps:remove(Ref, Map) }, - {Packing, Chunk, _, _, _} = ChunkArgs, - case ar_sync_record:delete(Offset, Offset - ?DATA_CHUNK_SIZE, - ar_data_sync, StoreID) of + {Packing, Chunk, Offset, _, ChunkSize} = ChunkArgs, + StartOffset = Offset - ?DATA_CHUNK_SIZE, + RemoveFromSyncRecordResult = ar_sync_record:delete(Offset, + StartOffset, ar_data_sync, StoreID), + IsStorageSupported = + case RemoveFromSyncRecordResult of + ok -> + is_storage_supported(Offset, ChunkSize, Packing); + Error -> + Error + end, + RemoveFromChunkStorageSyncRecordResult = + case IsStorageSupported of + true -> + store; + false -> + %% Based on the new packing we do not want to + %% store the chunk in the chunk storage anymore so + %% we also remove the record from the + %% chunk-storage specific sync record and + %% send the chunk to the corresponding ar_data_sync + %% module to store it in RocksDB. + ar_sync_record:delete(Offset, StartOffset, + ?MODULE, StoreID); + Error2 -> + Error2 + end, + case RemoveFromChunkStorageSyncRecordResult of ok -> + DataSyncServer = ar_data_sync:name(StoreID), + gen_server:cast(DataSyncServer, + {store_chunk, ChunkArgs, Args}), + {noreply, State2#state{ repack_cursor = Offset, + prev_repack_cursor = PrevCursor }}; + store -> case handle_store_chunk(Offset, Chunk, FileIndex, StoreID) of {ok, FileIndex2} -> ar_sync_record:add_async(repacked_chunk, - Offset, Offset - ?DATA_CHUNK_SIZE, + Offset, StartOffset, Packing, ar_data_sync, StoreID), {noreply, State2#state{ file_index = FileIndex2, - repack_cursor = Offset, prev_repack_cursor = PrevCursor }}; - Error2 -> + repack_cursor = Offset, + prev_repack_cursor = PrevCursor }}; + Error3 -> + PackingStr = ar_serialize:encode_packing(Packing, true), ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, - {packing, ar_serialize:encode_packing(Packing, true)}, - {error, io_lib:format("~p", [Error2])}]), + {packing, PackingStr}, + {error, io_lib:format("~p", [Error3])}]), {noreply, State2} end; - Error3 -> - ?LOG_ERROR([{event, failed_to_remove_repacked_chunk_from_sync_record}, + Error4 -> + PackingStr = ar_serialize:encode_packing(Packing, true), + ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, - {packing, ar_serialize:encode_packing(Packing, true)}, - {error, io_lib:format("~p", [Error3])}]), + {packing, PackingStr}, + {error, io_lib:format("~p", [Error4])}]), {noreply, State2} end end; @@ -828,7 +864,8 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> case ar_packing_server:is_buffer_full() of true -> ar_util:cast_after(200, Server, - {repack, Start, End, NextCursor, RightBound, RequiredPacking}), + {repack, Start, End, + NextCursor, RightBound, RequiredPacking}), continue; false -> ok @@ -846,8 +883,8 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> continue -> continue; {ok, Range2} -> - repack_read_chunk_metadata_range(StoreID, Range2, End, - RepackFurtherArgs) + repack_read_chunk_metadata_range(Start, RepackIntervalSize, End, + Range2, StoreID, RepackFurtherArgs) end, case ReadMetadataRange of continue -> @@ -855,7 +892,7 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> {ok, Map2, MetadataMap2} -> gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, RequiredPacking}), - Args = {StoreID, RequiredPacking, Map2, Server}, + Args = {StoreID, RequiredPacking, Map2}, repack_send_chunks_for_repacking(MetadataMap2, Args) end. @@ -876,18 +913,18 @@ repack_read_chunk_range(Start, Size, StoreID, RepackFurtherArgs) -> {ok, Range} end. -repack_read_chunk_metadata_range(StoreID, Range, End, RepackFurtherArgs) -> +repack_read_chunk_metadata_range(Start, Size, End, + Range, StoreID, RepackFurtherArgs) -> Server = name(StoreID), - {Min, Max, Map} = chunk_offset_list_to_map(Range), % TODO - case ar_data_sync:get_chunk_metadata_range(Min, min(Max, End), StoreID) of + End2 = min(Start + Size, End), + {_, _, Map} = chunk_offset_list_to_map(Range), + case ar_data_sync:get_chunk_metadata_range(Start, End2, StoreID) of {ok, MetadataMap} -> {ok, Map, MetadataMap}; {error, Error} -> ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, {storage_module, StoreID}, - {error, io_lib:format("~p", [Error])}, - {left, Min}, - {right, Max}]), + {error, io_lib:format("~p", [Error])}]), gen_server:cast(Server, RepackFurtherArgs), continue end. @@ -900,16 +937,16 @@ repack_send_chunks_for_repacking(Args) -> when ChunkSize /= ?DATA_CHUNK_SIZE, AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> ok; - (AbsoluteOffset, {ChunkDataKey, TXRoot, _, _, _, ChunkSize}, ok) -> - repack_send_chunk_for_repacking(AbsoluteOffset, - ChunkDataKey, TXRoot, ChunkSize, Args) + (AbsoluteOffset, ChunkMeta, ok) -> + repack_send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) end. -repack_send_chunk_for_repacking(AbsoluteOffset, ChunkDataKey, - TXRoot, ChunkSize, Args) -> +repack_send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) -> {StoreID, RequiredPacking, ChunkMap} = Args, Server = name(StoreID), PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + {ChunkDataKey, TXRoot, DataRoot, TXPath, + RelativeOffset, ChunkSize} = ChunkMeta, case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync, StoreID) of {true, RequiredPacking} -> ?LOG_WARNING([{event, repacking_process_chunk_already_repacked}, @@ -919,49 +956,37 @@ repack_send_chunk_for_repacking(AbsoluteOffset, ChunkDataKey, {offset, AbsoluteOffset}]), ok; {true, Packing} -> - Chunk = + ChunkMaybeDataPath = case maps:get(PaddedOffset, ChunkMap, not_found) of not_found -> - case is_storage_supported(PaddedOffset, ChunkSize, Packing) of + repack_read_chunk_and_data_path(StoreID, + ChunkDataKey, AbsoluteOffset, no_chunk); + Chunk3 -> + case is_storage_supported(AbsoluteOffset, + ChunkSize, Packing) of false -> - %% We do not store unpacked chunks in - %% ar_chunk_storage. % TODO - not_found; + %% We are going to move this chunk to + %% RocksDB after repacking so we read + %% its DataPath here to pass it later on + %% to store_chunk. + repack_read_chunk_and_data_path(StoreID, + ChunkDataKey, AbsoluteOffset, Chunk3); true -> - case ar_kv:get({chunk_data_db, StoreID}, ChunkDataKey) of - not_found -> - ?LOG_WARNING([{event, - entry_not_found_in_chunk_data_db}, - {type, repack_in_place}, - {storage_module, StoreID}, - {offset, AbsoluteOffset}, - {padded_offset, PaddedOffset}]), - not_found; - {ok, V} -> - case binary_to_term(V) of - {Chunk2, _DataPath} -> - Chunk2; - _ -> - ?LOG_WARNING([{event, - chunk_neither_in_chunk_data_db_nor_in_chunk_storage}, - {type, repack_in_place}, - {storage_module, StoreID}, - {offset, AbsoluteOffset}, - {padded_offset, PaddedOffset}]), - not_found - end - end - end; - Chunk3 -> - Chunk3 + %% We are going to repack the chunk and keep it + %% in the chunk storage - no need to make an + %% extra disk access to read the data path. + {Chunk3, none} + end end, - case Chunk of + case ChunkMaybeDataPath of not_found -> ok; - _ -> + {Chunk, MaybeDataPath} -> Ref = make_ref(), + RepackArgs = {Packing, MaybeDataPath, RelativeOffset, + DataRoot, TXPath, none, none}, gen_server:cast(Server, - {register_packing_ref, Ref, PaddedOffset}), + {register_packing_ref, RepackArgs}), ar_util:cast_after(300000, Server, {expire_repack_request, Ref}), ar_packing_server:request_repack(Ref, whereis(Server), @@ -980,6 +1005,30 @@ repack_send_chunk_for_repacking(AbsoluteOffset, ChunkDataKey, ok end. +repack_read_chunk_and_data_path(StoreID, ChunkDataKey, AbsoluteOffset, + MaybeChunk) -> + case ar_kv:get({chunk_data_db, StoreID}, ChunkDataKey) of + not_found -> + ?LOG_WARNING([{event, chunk_not_found}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}]), + not_found; + {ok, V} -> + case binary_to_term(V) of + {Chunk, DataPath} -> + {Chunk, DataPath}; + DataPath when MaybeChunk /= no_chunk -> + {MaybeChunk, DataPath}; + _ -> + ?LOG_WARNING([{event, chunk_not_found2}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}]), + not_found + end + end. + chunk_offset_list_to_map([], Min, Max, Map) -> {Min, Max, Map}; chunk_offset_list_to_map([{Offset, Chunk} | ChunkOffsets], Min, Max, Map) -> diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 3b4843487..c6d1c6983 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -867,6 +867,16 @@ handle_cast({pack_and_store_chunk, Args} = Cast, {noreply, State} end; +handle_cast({store_chunk, ChunkArgs, Args} = Cast, + #sync_data_state{ store_id = StoreID } = State) -> + case is_disk_space_sufficient(StoreID) of + true -> + {noreply, store_chunk(ChunkArgs, Args, State)}; + _ -> + ar_util:cast_after(30000, self(), Cast), + {noreply, State} + end; + %% Schedule syncing of the unsynced intervals. Choose a peer for each of the intervals. %% There are two message payloads: %% 1. collect_peer_intervals From 8b1b9ce693bb098be2175bc5000da2dda2295031 Mon Sep 17 00:00:00 2001 From: Lev Berman Date: Thu, 7 Nov 2024 21:20:24 +0100 Subject: [PATCH 8/8] fixup --- apps/arweave/src/ar_chunk_storage.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index a36bfcc84..f0e34a4d9 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -986,7 +986,7 @@ repack_send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) -> RepackArgs = {Packing, MaybeDataPath, RelativeOffset, DataRoot, TXPath, none, none}, gen_server:cast(Server, - {register_packing_ref, RepackArgs}), + {register_packing_ref, Ref, RepackArgs}), ar_util:cast_after(300000, Server, {expire_repack_request, Ref}), ar_packing_server:request_repack(Ref, whereis(Server),