From 03e2894d22b0692b77a5f281f4a1fa04e326a96b Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 10 Jun 2025 15:10:50 +0300 Subject: [PATCH 01/12] update prod profile --- rebar.config | 1 + 1 file changed, 1 insertion(+) diff --git a/rebar.config b/rebar.config index 7d4e8e3..41c6939 100644 --- a/rebar.config +++ b/rebar.config @@ -76,6 +76,7 @@ {tools, load}, % log formatter {logger_logstash_formatter, load}, + {canal, load}, % main app {machinegun, permanent} ]}, From e8f4447deae07a0c0ff5ed2663de685d63e93acb Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 11 Jun 2025 09:34:58 +0300 Subject: [PATCH 02/12] debug canal --- rebar.config | 3 ++- rebar.lock | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/rebar.config b/rebar.config index 41c6939..d6abc9f 100644 --- a/rebar.config +++ b/rebar.config @@ -51,7 +51,8 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.1"}}}, + {canal, {git, "https://github.com/valitydev/canal", {branch, "ft/pass-httpc-opts"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.2"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} diff --git a/rebar.lock b/rebar.lock index dfa7e3f..b47e23c 100644 --- a/rebar.lock +++ b/rebar.lock @@ -5,8 +5,8 @@ {<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},1}, {<<"canal">>, {git,"https://github.com/valitydev/canal", - {ref,"621d3821cd0a6036fee75d8e3b2d17167f3268e4"}}, - 2}, + {ref,"61c91d122dae4c7dba5e67b55831f0dd97e2cbe2"}}, + 0}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2}, {<<"cg_mon">>, {git,"https://github.com/valitydev/cg_mon", @@ -36,9 +36,13 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, + {<<"hamcrest">>, + {git,"https://github.com/basho/hamcrest-erlang.git", + {ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}}, + 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, - {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, + {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},1}, {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.0.1">>},1}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, @@ -63,7 +67,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"6df2e447a867434ad45bfc3540c4681e10105e02"}}, + {ref,"4c44615f712ae8992ff1a654f227def9f44c8aa7"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, From 631c63f801f99047628d0b5f33c9654975720b11 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 16 Jun 2025 09:16:41 +0300 Subject: [PATCH 03/12] bump progressor --- rebar.config | 3 +-- rebar.lock | 10 +++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/rebar.config b/rebar.config index d6abc9f..5859e4a 100644 --- a/rebar.config +++ b/rebar.config @@ -51,8 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {canal, {git, "https://github.com/valitydev/canal", {branch, "ft/pass-httpc-opts"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.2"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.3"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} diff --git a/rebar.lock b/rebar.lock index b47e23c..87f9c99 100644 --- a/rebar.lock +++ b/rebar.lock @@ -5,8 +5,8 @@ {<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},1}, {<<"canal">>, {git,"https://github.com/valitydev/canal", - {ref,"61c91d122dae4c7dba5e67b55831f0dd97e2cbe2"}}, - 0}, + {ref,"89faedce3b054bcca7cc31ca64d2ead8a9402305"}}, + 2}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2}, {<<"cg_mon">>, {git,"https://github.com/valitydev/cg_mon", @@ -19,7 +19,7 @@ {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"dd93e27c00d492169e8a7bfc38976b911c6e7d05"}}, + {ref,"4c35b8dc26955e589323c64bd1dd0c9abe1e3c13"}}, 1}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", @@ -42,7 +42,7 @@ 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, - {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},1}, + {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.0.1">>},1}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, @@ -67,7 +67,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"4c44615f712ae8992ff1a654f227def9f44c8aa7"}}, + {ref,"8ce69f723b8dce8ac4d0b66ef63af6d4a5d4a309"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, From a524d5f5ebbda4cd113a28dd53d5a2f332c26007 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 16 Jun 2025 09:55:50 +0300 Subject: [PATCH 04/12] update configurator --- rel_scripts/configurator.escript | 37 ++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index 5a7b090..bf39570 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -650,17 +650,50 @@ prg_namespace(NsPgPool) -> }. canal(YamlConfig) -> + Default = [ + {httpc_options, [{ssl, [{verify, verify_none}]}]}, + {kvv2_secret_mount_path, "/secret/data/"} + ], lists:foldl( fun ({<<"url">>, Url}, Acc) -> [{url, unicode:characters_to_list(Url)} | Acc]; ({<<"engine">>, Value}, Acc) -> - [{engine, ?C:atom(Value)} | Acc] + [{engine, ?C:atom(Value)} | Acc]; + ({<<"httpc_options">>, Value}, Acc) -> + [{httpc_options, canal_httpc_options(Value)} | Acc]; + ({<<"kvv2_secret_mount_path">>, Value}, Acc) -> + [{kvv2_secret_mount_path, unicode:characters_to_list(Value)} | Acc] end, - [], + Default, ?C:conf([canal], YamlConfig, []) ). +canal_httpc_options(Value) -> + lists:foldl( + fun + ({<<"ssl">>, SslOpts}, Acc) -> + [{ssl, ssl_opts(SslOpts)} | Acc]; + (_, Acc) -> + Acc + end, + [], + Value + ). + +ssl_opts(Opts) -> + Default = [{ssl, [{verify, verify_none}]}], + lists:foldl( + fun + ({<<"verify">>, Value}, Acc) -> + [{verify, ?C:atom(Value)} | Acc]; + (_, Acc) -> + Acc + end, + Default, + Opts + ). + %% %% vm.args %% From 7d776c0f6fa60a9a73839d1bbec70c566bbde483 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 16 Sep 2025 14:08:21 +0300 Subject: [PATCH 05/12] fix aux_state marshaling --- apps/mg_progressor/src/mg_progressor.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl index a7ea20a..81f2936 100644 --- a/apps/mg_progressor/src/mg_progressor.erl +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -121,9 +121,12 @@ maybe_unmarshal(Type, Value) -> unmarshal(term, Value) -> erlang:term_to_binary(Value). -maybe_marshal(_Type, undefined) -> - undefined; maybe_marshal(Type, Value) -> + maybe_marshal(Type, Value, undefined). + +maybe_marshal(_Type, undefined, Default) -> + Default; +maybe_marshal(Type, Value, _Default) -> marshal(Type, Value). marshal(process, Process) -> @@ -133,8 +136,10 @@ marshal(process, Process) -> history = maybe_marshal(history, maps:get(history, Process)), history_range = marshal(history_range, maps:get(history_range, Process)), status = marshal(status, {maps:get(status, Process), maps:get(detail, Process, undefined)}), - aux_state = maybe_marshal(term, maps:get(aux_state, Process, undefined)) + aux_state = maybe_marshal(aux_state, maps:get(aux_state, Process, undefined), {bin, <<>>}) }; +marshal(aux_state, AuxState) -> + #mg_stateproc_Content{data = maybe_marshal(term, AuxState)}; marshal(history, History) -> lists:map(fun(Ev) -> marshal(event, Ev) end, History); marshal(event, Event) -> From d61886683089c44aa29e7d53bc2d7ddc59ebf476 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 16 Sep 2025 15:31:13 +0300 Subject: [PATCH 06/12] fix aux_data marshaling again --- apps/mg_progressor/src/mg_progressor.erl | 54 ++++++++++++++++++++---- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl index 81f2936..6b9128a 100644 --- a/apps/mg_progressor/src/mg_progressor.erl +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -121,12 +121,9 @@ maybe_unmarshal(Type, Value) -> unmarshal(term, Value) -> erlang:term_to_binary(Value). +maybe_marshal(_Type, undefined) -> + undefined; maybe_marshal(Type, Value) -> - maybe_marshal(Type, Value, undefined). - -maybe_marshal(_Type, undefined, Default) -> - Default; -maybe_marshal(Type, Value, _Default) -> marshal(Type, Value). marshal(process, Process) -> @@ -136,10 +133,8 @@ marshal(process, Process) -> history = maybe_marshal(history, maps:get(history, Process)), history_range = marshal(history_range, maps:get(history_range, Process)), status = marshal(status, {maps:get(status, Process), maps:get(detail, Process, undefined)}), - aux_state = maybe_marshal(aux_state, maps:get(aux_state, Process, undefined), {bin, <<>>}) + aux_state = to_content(maybe_marshal(term, maps:get(aux_state, Process, undefined))) }; -marshal(aux_state, AuxState) -> - #mg_stateproc_Content{data = maybe_marshal(term, AuxState)}; marshal(history, History) -> lists:map(fun(Ev) -> marshal(event, Ev) end, History); marshal(event, Event) -> @@ -174,3 +169,46 @@ format_version(#{<<"format_version">> := Version}) -> Version; format_version(_) -> undefined. + +to_content(undefined) -> + undefined; +to_content(#mg_stateproc_Content{} = Content) -> + Content; +to_content({T, _V} = MsgPackValue) when + T =:= nl; + T =:= b; + T =:= i; + T =:= flt; + T =:= str; + T =:= bin; + T =:= arr; + T =:= obj +-> + #mg_stateproc_Content{data = MsgPackValue}; +to_content(Data) -> + #mg_stateproc_Content{data = to_msgpack(Data)}. + +to_msgpack(undefined) -> + {nl, #mg_msgpack_Nil{}}; +to_msgpack(Binary) when is_binary(Binary) -> + #mg_stateproc_Content{data = {bin, Binary}}; +to_msgpack(Boolean) when is_boolean(Boolean) -> + #mg_stateproc_Content{data = {b, Boolean}}; +to_msgpack(Integer) when is_integer(Integer) -> + #mg_stateproc_Content{data = {i, Integer}}; +to_msgpack(Float) when is_float(Float) -> + #mg_stateproc_Content{data = {flt, Float}}; +to_msgpack(Array) when is_list(Array) -> + #mg_stateproc_Content{data = {arr, lists:map(fun to_msgpack/1, Array)}}; +to_msgpack(Object) when is_map(Object) -> + Data = { + obj, + maps:fold( + fun(K, V, Acc) -> + maps:put(to_msgpack(K), to_msgpack(V), Acc) + end, + #{}, + Object + ) + }, + #mg_stateproc_Content{data = Data}. From 16eab5cacca6006d42ff462d080de999e086382f Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 16 Sep 2025 15:35:04 +0300 Subject: [PATCH 07/12] fix formatting --- apps/mg_progressor/src/mg_progressor.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl index 6b9128a..9d9c278 100644 --- a/apps/mg_progressor/src/mg_progressor.erl +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -201,7 +201,7 @@ to_msgpack(Float) when is_float(Float) -> to_msgpack(Array) when is_list(Array) -> #mg_stateproc_Content{data = {arr, lists:map(fun to_msgpack/1, Array)}}; to_msgpack(Object) when is_map(Object) -> - Data = { + Data = { obj, maps:fold( fun(K, V, Acc) -> From ed57e3e112916f9d4077a08378cdd10e9449bbfd Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 16 Sep 2025 16:20:12 +0300 Subject: [PATCH 08/12] fix msgpack marshaling --- apps/mg_progressor/src/mg_progressor.erl | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl index 9d9c278..f02007f 100644 --- a/apps/mg_progressor/src/mg_progressor.erl +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -201,8 +201,7 @@ to_msgpack(Float) when is_float(Float) -> to_msgpack(Array) when is_list(Array) -> #mg_stateproc_Content{data = {arr, lists:map(fun to_msgpack/1, Array)}}; to_msgpack(Object) when is_map(Object) -> - Data = { - obj, + try maps:fold( fun(K, V, Acc) -> maps:put(to_msgpack(K), to_msgpack(V), Acc) @@ -210,5 +209,12 @@ to_msgpack(Object) when is_map(Object) -> #{}, Object ) - }, - #mg_stateproc_Content{data = Data}. + of + Data -> + #mg_stateproc_Content{data = {obj, Data}} + catch + _:_ -> + #mg_stateproc_Content{data = {bin, erlang:term_to_binary(Object)}} + end; +to_msgpack(Term) -> + #mg_stateproc_Content{data = {bin, erlang:term_to_binary(Term)}}. From fa095527c5583eebe582472e4a3d0f6808e4e658 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 16 Sep 2025 16:48:40 +0300 Subject: [PATCH 09/12] fix msgpack marshaling --- apps/mg_progressor/src/mg_progressor.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl index f02007f..9451b72 100644 --- a/apps/mg_progressor/src/mg_progressor.erl +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -191,15 +191,15 @@ to_content(Data) -> to_msgpack(undefined) -> {nl, #mg_msgpack_Nil{}}; to_msgpack(Binary) when is_binary(Binary) -> - #mg_stateproc_Content{data = {bin, Binary}}; + {bin, Binary}; to_msgpack(Boolean) when is_boolean(Boolean) -> - #mg_stateproc_Content{data = {b, Boolean}}; + {b, Boolean}; to_msgpack(Integer) when is_integer(Integer) -> - #mg_stateproc_Content{data = {i, Integer}}; + {i, Integer}; to_msgpack(Float) when is_float(Float) -> - #mg_stateproc_Content{data = {flt, Float}}; + {flt, Float}; to_msgpack(Array) when is_list(Array) -> - #mg_stateproc_Content{data = {arr, lists:map(fun to_msgpack/1, Array)}}; + {arr, lists:map(fun to_msgpack/1, Array)}; to_msgpack(Object) when is_map(Object) -> try maps:fold( @@ -211,10 +211,10 @@ to_msgpack(Object) when is_map(Object) -> ) of Data -> - #mg_stateproc_Content{data = {obj, Data}} + {obj, Data} catch _:_ -> - #mg_stateproc_Content{data = {bin, erlang:term_to_binary(Object)}} + {bin, erlang:term_to_binary(Object)} end; to_msgpack(Term) -> - #mg_stateproc_Content{data = {bin, erlang:term_to_binary(Term)}}. + {bin, erlang:term_to_binary(Term)}. From 153b7f4ce8f00b7ff35daef3f28eb5f584ab3d6d Mon Sep 17 00:00:00 2001 From: ttt161 Date: Fri, 12 Dec 2025 08:27:30 +0300 Subject: [PATCH 10/12] bump progressor, update configurator.escript --- config/config.yaml | 20 ++++++++++++- rebar.config | 2 +- rebar.lock | 10 ++----- rel_scripts/configurator.escript | 48 ++++++++++++++++++++++++-------- 4 files changed, 59 insertions(+), 21 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index c0c9f8d..3123aac 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -354,7 +354,25 @@ postgres: ## Progressor namespaces settings progressor: - prg_test_ns: some_processor_pool1 + prg_test_ns_2: + # required + storage: + # optional + client: prg_pg_backend + # required + options: + # required + pool: another_processor_pool + # optional + notifier: + # required + client: default_kafka_client + # required + options: + # required + topic: eventsink_topic + # required + lifecycle_topic: lifecycle_topic # Optional section # if not defined then canal will not be started diff --git a/rebar.config b/rebar.config index 5859e4a..dff23ee 100644 --- a/rebar.config +++ b/rebar.config @@ -51,7 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.3"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {branch, "fx/fix-for-simple-repair"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} diff --git a/rebar.lock b/rebar.lock index f7f06e9..e6fd204 100644 --- a/rebar.lock +++ b/rebar.lock @@ -19,11 +19,11 @@ {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"4c35b8dc26955e589323c64bd1dd0c9abe1e3c13"}}, + {ref,"af35200fa1c63e7afeaa90cad862944c194b2686"}}, 1}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", - {ref,"7ba52768cf0ea7d084df24d4275a88eef4db13c2"}}, + {ref,"28e9f84c95065a51e92baeb37d2cf1687fc4b9ce"}}, 2}, {<<"erl_health">>, {git,"https://github.com/valitydev/erlang-health", @@ -36,10 +36,6 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, - {<<"hamcrest">>, - {git,"https://github.com/basho/hamcrest-erlang.git", - {ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}}, - 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, @@ -67,7 +63,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"8ce69f723b8dce8ac4d0b66ef63af6d4a5d4a309"}}, + {ref,"83330e6eb697267bdb802090c827b54d86c38b3e"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index bf39570..2abd120 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -628,25 +628,49 @@ pg_pool_opts(PoolOpts) -> progressor(YamlConfig) -> PrgNamespaces = lists:foldl( - fun({NsName, NsPgPool}, Acc) -> - Acc#{?C:atom(NsName) => prg_namespace(?C:atom(NsPgPool))} + fun({NsName, NsOpts}, Acc) -> + Acc#{?C:atom(NsName) => prg_namespace(NsOpts)} end, #{}, ?C:conf([progressor], YamlConfig, []) ), [{namespaces, PrgNamespaces}]. -prg_namespace(NsPgPool) -> - #{ - storage => #{ - client => prg_pg_backend, - options => #{pool => NsPgPool} - }, - processor => #{ - %% Never will be called - client => null - }, +prg_namespace(NsOptsList) -> + InitAcc = #{ + processor => #{client => null}, worker_pool_size => 0 + }, + lists:foldl( + fun + ({<<"storage">>, StorageOpts}, Acc) -> Acc#{storage => prg_storage_opts(StorageOpts)}; + ({<<"notifier">>, NotifierOpts}, Acc) -> Acc#{notifier => prg_notifier_opts(NotifierOpts)}; + (_, Acc) -> Acc + end, + InitAcc, + NsOptsList + ). + +prg_storage_opts(StorageOpts) -> + Client = ?C:atom(?C:conf([client], StorageOpts, <<"prg_pg_backend">>)), + OptsList = ?C:conf([options], StorageOpts), + #{ + client => Client, + options => prg_storage_handler_opts(Client, OptsList) + }. + +prg_storage_handler_opts(prg_pg_backend, OptsList) -> + #{pool => ?C:atom(?C:conf([pool], OptsList))}. + +prg_notifier_opts(NotifierOpts) -> + Client = ?C:atom(?C:conf([client], NotifierOpts)), + OptsList = ?C:conf([options], NotifierOpts), + #{ + client => Client, + options => #{ + topic => ?C:conf([topic], OptsList), + lifecycle_topic => ?C:conf([lifecycle_topic], OptsList) + } }. canal(YamlConfig) -> From 23e05130b086aaa3386e8d41a7e40f427524c5c5 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Fri, 12 Dec 2025 08:42:10 +0300 Subject: [PATCH 11/12] fix yaml format --- config/config.yaml | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 3123aac..0549e5d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -355,24 +355,24 @@ postgres: progressor: prg_test_ns_2: - # required - storage: - # optional - client: prg_pg_backend - # required - options: - # required - pool: another_processor_pool - # optional - notifier: # required - client: default_kafka_client - # required - options: - # required - topic: eventsink_topic - # required - lifecycle_topic: lifecycle_topic + storage: + # optional + client: prg_pg_backend + # required + options: + # required + pool: another_processor_pool + # optional + notifier: + # required + client: default_kafka_client + # required + options: + # required + topic: eventsink_topic + # required + lifecycle_topic: lifecycle_topic # Optional section # if not defined then canal will not be started From 6b74da9d889950b88d369fc3927d4c0fe4a249a9 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 15 Dec 2025 15:09:35 +0300 Subject: [PATCH 12/12] bump progressor-1.0.15 --- rebar.config | 2 +- rebar.lock | 2 +- rel_scripts/configurator.escript | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rebar.config b/rebar.config index dff23ee..3aaa821 100644 --- a/rebar.config +++ b/rebar.config @@ -51,7 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {branch, "fx/fix-for-simple-repair"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.15"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} diff --git a/rebar.lock b/rebar.lock index e6fd204..fdf8755 100644 --- a/rebar.lock +++ b/rebar.lock @@ -63,7 +63,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"83330e6eb697267bdb802090c827b54d86c38b3e"}}, + {ref,"fdebffd0db07208faa452e0151491769949a5076"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index 2abd120..d9df4f7 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -634,7 +634,7 @@ progressor(YamlConfig) -> #{}, ?C:conf([progressor], YamlConfig, []) ), - [{namespaces, PrgNamespaces}]. + [{namespaces, PrgNamespaces}, {migration_enabled, false}]. prg_namespace(NsOptsList) -> InitAcc = #{