diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl index a7ea20a..9451b72 100644 --- a/apps/mg_progressor/src/mg_progressor.erl +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -133,7 +133,7 @@ marshal(process, Process) -> history = maybe_marshal(history, maps:get(history, Process)), history_range = marshal(history_range, maps:get(history_range, Process)), status = marshal(status, {maps:get(status, Process), maps:get(detail, Process, undefined)}), - aux_state = maybe_marshal(term, maps:get(aux_state, Process, undefined)) + aux_state = to_content(maybe_marshal(term, maps:get(aux_state, Process, undefined))) }; marshal(history, History) -> lists:map(fun(Ev) -> marshal(event, Ev) end, History); @@ -169,3 +169,52 @@ format_version(#{<<"format_version">> := Version}) -> Version; format_version(_) -> undefined. + +to_content(undefined) -> + undefined; +to_content(#mg_stateproc_Content{} = Content) -> + Content; +to_content({T, _V} = MsgPackValue) when + T =:= nl; + T =:= b; + T =:= i; + T =:= flt; + T =:= str; + T =:= bin; + T =:= arr; + T =:= obj +-> + #mg_stateproc_Content{data = MsgPackValue}; +to_content(Data) -> + #mg_stateproc_Content{data = to_msgpack(Data)}. + +to_msgpack(undefined) -> + {nl, #mg_msgpack_Nil{}}; +to_msgpack(Binary) when is_binary(Binary) -> + {bin, Binary}; +to_msgpack(Boolean) when is_boolean(Boolean) -> + {b, Boolean}; +to_msgpack(Integer) when is_integer(Integer) -> + {i, Integer}; +to_msgpack(Float) when is_float(Float) -> + {flt, Float}; +to_msgpack(Array) when is_list(Array) -> + {arr, lists:map(fun to_msgpack/1, Array)}; +to_msgpack(Object) when is_map(Object) -> + try + maps:fold( + fun(K, V, Acc) -> + maps:put(to_msgpack(K), to_msgpack(V), Acc) + end, + #{}, + Object + ) + of + Data -> + {obj, Data} + catch + _:_ -> + {bin, erlang:term_to_binary(Object)} + end; +to_msgpack(Term) -> + {bin, erlang:term_to_binary(Term)}. diff --git a/config/config.yaml b/config/config.yaml index c0c9f8d..0549e5d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -354,7 +354,25 @@ postgres: ## Progressor namespaces settings progressor: - prg_test_ns: some_processor_pool1 + prg_test_ns_2: + # required + storage: + # optional + client: prg_pg_backend + # required + options: + # required + pool: another_processor_pool + # optional + notifier: + # required + client: default_kafka_client + # required + options: + # required + topic: eventsink_topic + # required + lifecycle_topic: lifecycle_topic # Optional section # if not defined then canal will not be started diff --git a/rebar.config b/rebar.config index 7d4e8e3..3aaa821 100644 --- a/rebar.config +++ b/rebar.config @@ -51,7 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.1"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.15"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} @@ -76,6 +76,7 @@ {tools, load}, % log formatter {logger_logstash_formatter, load}, + {canal, load}, % main app {machinegun, permanent} ]}, diff --git a/rebar.lock b/rebar.lock index 80e2518..fdf8755 100644 --- a/rebar.lock +++ b/rebar.lock @@ -5,7 +5,7 @@ {<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},1}, {<<"canal">>, {git,"https://github.com/valitydev/canal", - {ref,"621d3821cd0a6036fee75d8e3b2d17167f3268e4"}}, + {ref,"89faedce3b054bcca7cc31ca64d2ead8a9402305"}}, 2}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2}, {<<"cg_mon">>, @@ -19,11 +19,11 @@ {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"dd93e27c00d492169e8a7bfc38976b911c6e7d05"}}, + {ref,"af35200fa1c63e7afeaa90cad862944c194b2686"}}, 1}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", - {ref,"7ba52768cf0ea7d084df24d4275a88eef4db13c2"}}, + {ref,"28e9f84c95065a51e92baeb37d2cf1687fc4b9ce"}}, 2}, {<<"erl_health">>, {git,"https://github.com/valitydev/erlang-health", @@ -36,10 +36,6 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, - {<<"hamcrest">>, - {git,"https://github.com/basho/hamcrest-erlang.git", - {ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}}, - 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, @@ -67,7 +63,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"6df2e447a867434ad45bfc3540c4681e10105e02"}}, + {ref,"fdebffd0db07208faa452e0151491769949a5076"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index 5a7b090..d9df4f7 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -628,39 +628,96 @@ pg_pool_opts(PoolOpts) -> progressor(YamlConfig) -> PrgNamespaces = lists:foldl( - fun({NsName, NsPgPool}, Acc) -> - Acc#{?C:atom(NsName) => prg_namespace(?C:atom(NsPgPool))} + fun({NsName, NsOpts}, Acc) -> + Acc#{?C:atom(NsName) => prg_namespace(NsOpts)} end, #{}, ?C:conf([progressor], YamlConfig, []) ), - [{namespaces, PrgNamespaces}]. + [{namespaces, PrgNamespaces}, {migration_enabled, false}]. -prg_namespace(NsPgPool) -> - #{ - storage => #{ - client => prg_pg_backend, - options => #{pool => NsPgPool} - }, - processor => #{ - %% Never will be called - client => null - }, +prg_namespace(NsOptsList) -> + InitAcc = #{ + processor => #{client => null}, worker_pool_size => 0 + }, + lists:foldl( + fun + ({<<"storage">>, StorageOpts}, Acc) -> Acc#{storage => prg_storage_opts(StorageOpts)}; + ({<<"notifier">>, NotifierOpts}, Acc) -> Acc#{notifier => prg_notifier_opts(NotifierOpts)}; + (_, Acc) -> Acc + end, + InitAcc, + NsOptsList + ). + +prg_storage_opts(StorageOpts) -> + Client = ?C:atom(?C:conf([client], StorageOpts, <<"prg_pg_backend">>)), + OptsList = ?C:conf([options], StorageOpts), + #{ + client => Client, + options => prg_storage_handler_opts(Client, OptsList) + }. + +prg_storage_handler_opts(prg_pg_backend, OptsList) -> + #{pool => ?C:atom(?C:conf([pool], OptsList))}. + +prg_notifier_opts(NotifierOpts) -> + Client = ?C:atom(?C:conf([client], NotifierOpts)), + OptsList = ?C:conf([options], NotifierOpts), + #{ + client => Client, + options => #{ + topic => ?C:conf([topic], OptsList), + lifecycle_topic => ?C:conf([lifecycle_topic], OptsList) + } }. canal(YamlConfig) -> + Default = [ + {httpc_options, [{ssl, [{verify, verify_none}]}]}, + {kvv2_secret_mount_path, "/secret/data/"} + ], lists:foldl( fun ({<<"url">>, Url}, Acc) -> [{url, unicode:characters_to_list(Url)} | Acc]; ({<<"engine">>, Value}, Acc) -> - [{engine, ?C:atom(Value)} | Acc] + [{engine, ?C:atom(Value)} | Acc]; + ({<<"httpc_options">>, Value}, Acc) -> + [{httpc_options, canal_httpc_options(Value)} | Acc]; + ({<<"kvv2_secret_mount_path">>, Value}, Acc) -> + [{kvv2_secret_mount_path, unicode:characters_to_list(Value)} | Acc] end, - [], + Default, ?C:conf([canal], YamlConfig, []) ). +canal_httpc_options(Value) -> + lists:foldl( + fun + ({<<"ssl">>, SslOpts}, Acc) -> + [{ssl, ssl_opts(SslOpts)} | Acc]; + (_, Acc) -> + Acc + end, + [], + Value + ). + +ssl_opts(Opts) -> + Default = [{ssl, [{verify, verify_none}]}], + lists:foldl( + fun + ({<<"verify">>, Value}, Acc) -> + [{verify, ?C:atom(Value)} | Acc]; + (_, Acc) -> + Acc + end, + Default, + Opts + ). + %% %% vm.args %%