Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion apps/mg_progressor/src/mg_progressor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ marshal(process, Process) ->
history = maybe_marshal(history, maps:get(history, Process)),
history_range = marshal(history_range, maps:get(history_range, Process)),
status = marshal(status, {maps:get(status, Process), maps:get(detail, Process, undefined)}),
aux_state = maybe_marshal(term, maps:get(aux_state, Process, undefined))
aux_state = to_content(maybe_marshal(term, maps:get(aux_state, Process, undefined)))
};
marshal(history, History) ->
lists:map(fun(Ev) -> marshal(event, Ev) end, History);
Expand Down Expand Up @@ -169,3 +169,52 @@ format_version(#{<<"format_version">> := Version}) ->
Version;
format_version(_) ->
undefined.

to_content(undefined) ->
undefined;
to_content(#mg_stateproc_Content{} = Content) ->
Content;
to_content({T, _V} = MsgPackValue) when
T =:= nl;
T =:= b;
T =:= i;
T =:= flt;
T =:= str;
T =:= bin;
T =:= arr;
T =:= obj
->
#mg_stateproc_Content{data = MsgPackValue};
to_content(Data) ->
#mg_stateproc_Content{data = to_msgpack(Data)}.

to_msgpack(undefined) ->
{nl, #mg_msgpack_Nil{}};
to_msgpack(Binary) when is_binary(Binary) ->
{bin, Binary};
to_msgpack(Boolean) when is_boolean(Boolean) ->
{b, Boolean};
to_msgpack(Integer) when is_integer(Integer) ->
{i, Integer};
to_msgpack(Float) when is_float(Float) ->
{flt, Float};
to_msgpack(Array) when is_list(Array) ->
{arr, lists:map(fun to_msgpack/1, Array)};
to_msgpack(Object) when is_map(Object) ->
try
maps:fold(
fun(K, V, Acc) ->
maps:put(to_msgpack(K), to_msgpack(V), Acc)
end,
#{},
Object
)
of
Data ->
{obj, Data}
catch
_:_ ->
{bin, erlang:term_to_binary(Object)}
end;
to_msgpack(Term) ->
{bin, erlang:term_to_binary(Term)}.
20 changes: 19 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,25 @@ postgres:
## Progressor namespaces settings

progressor:
prg_test_ns: some_processor_pool1
prg_test_ns_2:
# required
storage:
# optional
client: prg_pg_backend
# required
options:
# required
pool: another_processor_pool
# optional
notifier:
# required
client: default_kafka_client
# required
options:
# required
topic: eventsink_topic
# required
lifecycle_topic: lifecycle_topic

# Optional section
# if not defined then canal will not be started
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

{deps, [
{genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}},
{progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.1"}}},
{progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.15"}}},
% for configurator script
{yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}},
{cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}}
Expand All @@ -76,6 +76,7 @@
{tools, load},
% log formatter
{logger_logstash_formatter, load},
{canal, load},
% main app
{machinegun, permanent}
]},
Expand Down
12 changes: 4 additions & 8 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},1},
{<<"canal">>,
{git,"https://github.com/valitydev/canal",
{ref,"621d3821cd0a6036fee75d8e3b2d17167f3268e4"}},
{ref,"89faedce3b054bcca7cc31ca64d2ead8a9402305"}},
2},
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2},
{<<"cg_mon">>,
Expand All @@ -19,11 +19,11 @@
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2},
{<<"epg_connector">>,
{git,"https://github.com/valitydev/epg_connector.git",
{ref,"dd93e27c00d492169e8a7bfc38976b911c6e7d05"}},
{ref,"af35200fa1c63e7afeaa90cad862944c194b2686"}},
1},
{<<"epgsql">>,
{git,"https://github.com/epgsql/epgsql.git",
{ref,"7ba52768cf0ea7d084df24d4275a88eef4db13c2"}},
{ref,"28e9f84c95065a51e92baeb37d2cf1687fc4b9ce"}},
2},
{<<"erl_health">>,
{git,"https://github.com/valitydev/erlang-health",
Expand All @@ -36,10 +36,6 @@
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0},
{<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1},
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1},
{<<"hamcrest">>,
{git,"https://github.com/basho/hamcrest-erlang.git",
{ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}},
2},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3},
{<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2},
{<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3},
Expand Down Expand Up @@ -67,7 +63,7 @@
0},
{<<"progressor">>,
{git,"https://github.com/valitydev/progressor.git",
{ref,"6df2e447a867434ad45bfc3540c4681e10105e02"}},
{ref,"fdebffd0db07208faa452e0151491769949a5076"}},
0},
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0},
{<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0},
Expand Down
87 changes: 72 additions & 15 deletions rel_scripts/configurator.escript
Original file line number Diff line number Diff line change
Expand Up @@ -628,39 +628,96 @@ pg_pool_opts(PoolOpts) ->

progressor(YamlConfig) ->
PrgNamespaces = lists:foldl(
fun({NsName, NsPgPool}, Acc) ->
Acc#{?C:atom(NsName) => prg_namespace(?C:atom(NsPgPool))}
fun({NsName, NsOpts}, Acc) ->
Acc#{?C:atom(NsName) => prg_namespace(NsOpts)}
end,
#{},
?C:conf([progressor], YamlConfig, [])
),
[{namespaces, PrgNamespaces}].
[{namespaces, PrgNamespaces}, {migration_enabled, false}].

prg_namespace(NsPgPool) ->
#{
storage => #{
client => prg_pg_backend,
options => #{pool => NsPgPool}
},
processor => #{
%% Never will be called
client => null
},
prg_namespace(NsOptsList) ->
InitAcc = #{
processor => #{client => null},
worker_pool_size => 0
},
lists:foldl(
fun
({<<"storage">>, StorageOpts}, Acc) -> Acc#{storage => prg_storage_opts(StorageOpts)};
({<<"notifier">>, NotifierOpts}, Acc) -> Acc#{notifier => prg_notifier_opts(NotifierOpts)};
(_, Acc) -> Acc
end,
InitAcc,
NsOptsList
).

prg_storage_opts(StorageOpts) ->
Client = ?C:atom(?C:conf([client], StorageOpts, <<"prg_pg_backend">>)),
OptsList = ?C:conf([options], StorageOpts),
#{
client => Client,
options => prg_storage_handler_opts(Client, OptsList)
}.

prg_storage_handler_opts(prg_pg_backend, OptsList) ->
#{pool => ?C:atom(?C:conf([pool], OptsList))}.

prg_notifier_opts(NotifierOpts) ->
Client = ?C:atom(?C:conf([client], NotifierOpts)),
OptsList = ?C:conf([options], NotifierOpts),
#{
client => Client,
options => #{
topic => ?C:conf([topic], OptsList),
lifecycle_topic => ?C:conf([lifecycle_topic], OptsList)
}
}.

canal(YamlConfig) ->
Default = [
{httpc_options, [{ssl, [{verify, verify_none}]}]},
{kvv2_secret_mount_path, "/secret/data/"}
],
lists:foldl(
fun
({<<"url">>, Url}, Acc) ->
[{url, unicode:characters_to_list(Url)} | Acc];
({<<"engine">>, Value}, Acc) ->
[{engine, ?C:atom(Value)} | Acc]
[{engine, ?C:atom(Value)} | Acc];
({<<"httpc_options">>, Value}, Acc) ->
[{httpc_options, canal_httpc_options(Value)} | Acc];
({<<"kvv2_secret_mount_path">>, Value}, Acc) ->
[{kvv2_secret_mount_path, unicode:characters_to_list(Value)} | Acc]
end,
[],
Default,
?C:conf([canal], YamlConfig, [])
).

canal_httpc_options(Value) ->
lists:foldl(
fun
({<<"ssl">>, SslOpts}, Acc) ->
[{ssl, ssl_opts(SslOpts)} | Acc];
(_, Acc) ->
Acc
end,
[],
Value
).

ssl_opts(Opts) ->
Default = [{ssl, [{verify, verify_none}]}],
lists:foldl(
fun
({<<"verify">>, Value}, Acc) ->
[{verify, ?C:atom(Value)} | Acc];
(_, Acc) ->
Acc
end,
Default,
Opts
).

%%
%% vm.args
%%
Expand Down
Loading