diff --git a/.github/workflows/erlang-checks.yml b/.github/workflows/erlang-checks.yml index c1af1f9..81f5e84 100644 --- a/.github/workflows/erlang-checks.yml +++ b/.github/workflows/erlang-checks.yml @@ -29,7 +29,7 @@ jobs: run: name: Run checks needs: setup - uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.13 + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.14 with: otp-version: ${{ needs.setup.outputs.otp-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }} diff --git a/.gitignore b/.gitignore index 3510a12..0e05830 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ _checkouts/ # make stuff /.image.* Makefile.env + +.idea \ No newline at end of file diff --git a/Dockerfile.dev b/Dockerfile.dev index 35d7bc0..5b66261 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -7,7 +7,7 @@ ARG THRIFT_VERSION ARG BUILDARCH RUN apt-get --yes update && \ - apt-get --yes --no-install-recommends install iproute2=5.10.0-4 && \ + apt-get --yes --no-install-recommends install iproute2=5.10.0-4 sshpass=1.09-1+b1 && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* @@ -17,13 +17,19 @@ RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_ ENV NETUNREACH_NETWORK="10.254.254.0/24" ENV NETUNREACH_ADDRESS="10.254.254.10" -RUN echo '#!/bin/sh' >> /entrypoint.sh && \ - echo "ip route add throw ${NETUNREACH_NETWORK}" >> /entrypoint.sh && \ - echo 'exec "$@"' >> /entrypoint.sh && \ - chmod +x /entrypoint.sh +COPY ./test_resources/ssh/ /root/.ssh/ +RUN chown -R root:root /root/.ssh && \ + chmod 600 /root/.ssh/* + +COPY ./test_resources/entrypoint.sh.dev /entrypoint.sh +RUN chmod +x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] ENV CHARSET=UTF-8 ENV LANG=C.UTF-8 +ENV SERVICE_NAME=mg CMD ["/bin/bash"] + +EXPOSE 4369 +EXPOSE 8022 diff --git a/Dockerfile.test b/Dockerfile.test new file mode 100644 index 0000000..4fb8727 --- /dev/null +++ b/Dockerfile.test @@ -0,0 +1,64 @@ +ARG OTP_VERSION + +# Build the release +FROM docker.io/library/erlang:${OTP_VERSION} AS builder +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +# Install thrift compiler +ARG THRIFT_VERSION +ARG TARGETARCH + +RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${TARGETARCH}.tar.gz" \ + | tar -xvz -C /usr/local/bin/ + +# Copy sources +RUN mkdir /build +COPY . /build/ + +# Build the release +WORKDIR /build +RUN rebar3 compile \ + && rebar3 as prod release + +# Make a runner image +FROM docker.io/library/erlang:${OTP_VERSION}-slim + +ARG SERVICE_NAME=mg +ARG USER_UID=1001 +ARG USER_GID=$USER_UID + +# Set env +ENV CHARSET=UTF-8 +ENV LANG=C.UTF-8 + +# Expose SERVICE_NAME as env so CMD expands properly on start +ENV SERVICE_NAME=${SERVICE_NAME} + +# Set runtime +WORKDIR /opt/${SERVICE_NAME} + +COPY --from=builder /build/_build/prod/rel/${SERVICE_NAME} /opt/${SERVICE_NAME} +COPY ./test_resources/authorized_keys /root/.ssh/authorized_keys +RUN chown root:root /root/.ssh/authorized_keys && \ + chmod 600 /root/.ssh/* + +# install packages +RUN apt-get update && \ + apt-get --yes --no-install-recommends install openssh-server=1:8.4p1-5+deb11u3 iproute2=5.10.0-4 iputils-ping=3:20210202-1 && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# configure ssh +RUN mkdir /var/run/sshd && \ + /bin/bash -o pipefail -c "echo \"root:security\" | chpasswd" && \ + sed -i "s/#PermitRootLogin prohibit-password/PermitRootLogin yes/" /etc/ssh/sshd_config + +# create entrypoint +COPY ./test_resources/entrypoint.sh.test /entrypoint.sh +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] + +EXPOSE 22 +EXPOSE 4369 +EXPOSE 8022 diff --git a/Makefile b/Makefile index 622758d..a629530 100644 --- a/Makefile +++ b/Makefile @@ -14,15 +14,19 @@ DOTENV := $(shell grep -v '^\#' .env) DEV_IMAGE_TAG = $(TEST_CONTAINER_NAME)-dev DEV_IMAGE_ID = $(file < .image.dev) +TEST_IMAGE_TAG = $(DIST_CONTAINER_NAME)-test +TEST_IMAGE_ID = $(file < .image.test) + DOCKER ?= docker DOCKERCOMPOSE ?= docker-compose DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f compose.yaml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner +DIST_CONTAINER_NAME ?= distrunner all: compile xref lint check-format dialyze eunit -.PHONY: dev-image clean-dev-image wc-shell test +.PHONY: dev-image test-image clean-dev-image clean-test-image wc-shell test dev-image: .image.dev @@ -36,6 +40,18 @@ ifneq ($(DEV_IMAGE_ID),) rm .image.dev endif +test-image: .image.test + +.image.test: Dockerfile.test .env + env $(DOTENV) $(DOCKERCOMPOSE_W_ENV) build $(DIST_CONTAINER_NAME) + $(DOCKER) image ls -q -f "reference=$(TEST_IMAGE_ID)" | head -n1 > $@ + +clean-test-image: +ifneq ($(TEST_IMAGE_ID),) + $(DOCKER) image rm -f $(TEST_IMAGE_TAG) + rm .image.test +endif + DOCKER_WC_OPTIONS := -v $(PWD):$(PWD) --workdir $(PWD) DOCKER_WC_EXTRA_OPTIONS ?= --rm DOCKER_RUN = $(DOCKER) run -t $(DOCKER_WC_OPTIONS) $(DOCKER_WC_EXTRA_OPTIONS) @@ -87,7 +103,7 @@ eunit: $(REBAR) eunit --cover common-test: - $(REBAR) ct --cover --name test_node@127.0.0.1 + $(REBAR) ct --cover cover: $(REBAR) covertool generate diff --git a/apps/mg/src/mg_configurator.erl b/apps/mg/src/mg_configurator.erl index 94e8a79..d90f804 100644 --- a/apps/mg/src/mg_configurator.erl +++ b/apps/mg/src/mg_configurator.erl @@ -35,6 +35,8 @@ -type pulse() :: mg_core_pulse:handler(). +-type scaling_opts() :: #{scaling := mg_core_cluster:scaling_type()}. + -spec construct_child_specs(config()) -> [supervisor:child_spec()]. construct_child_specs( #{ @@ -46,6 +48,7 @@ construct_child_specs( Quotas = maps:get(quotas, Config, []), HealthChecks = maps:get(health_check, Config, #{}), ClusterOpts = maps:get(cluster, Config, #{}), + Scaling = maps:get(scaling, ClusterOpts, global_based), QuotasChildSpec = quotas_child_specs(Quotas, quota), EventMachinesChildSpec = events_machines_child_specs(Namespaces, Pulse), @@ -53,7 +56,7 @@ construct_child_specs( woody_server, #{ pulse => Pulse, - automaton => api_automaton_options(Namespaces, Pulse), + automaton => api_automaton_options(Namespaces, Pulse, #{scaling => Scaling}), woody_server => WoodyServer, additional_routes => [ get_startup_route(), @@ -62,7 +65,7 @@ construct_child_specs( ] } ), - ClusterSpec = mg_core_union:child_spec(ClusterOpts), + ClusterSpec = mg_core_cluster:child_spec(ClusterOpts), lists:flatten([ QuotasChildSpec, @@ -129,7 +132,8 @@ machine_options(NS, Config, Pulse) -> Options = maps:with( [ retries, - timer_processing_timeout + timer_processing_timeout, + scaling ], Config ), @@ -150,8 +154,8 @@ machine_options(NS, Config, Pulse) -> suicide_probability => maps:get(suicide_probability, Config, undefined) }. --spec api_automaton_options(namespaces(), pulse()) -> mg_woody_automaton:options(). -api_automaton_options(NSs, Pulse) -> +-spec api_automaton_options(namespaces(), pulse(), scaling_opts()) -> mg_woody_automaton:options(). +api_automaton_options(NSs, Pulse, ScalingOpts) -> maps:fold( fun(NS, ConfigNS, Options) -> Options#{ @@ -163,7 +167,7 @@ api_automaton_options(NSs, Pulse) -> ) } end, - #{}, + ScalingOpts, NSs ). diff --git a/apps/mg/src/mg_health_check.erl b/apps/mg/src/mg_health_check.erl index dcf5c9a..1a63668 100644 --- a/apps/mg/src/mg_health_check.erl +++ b/apps/mg/src/mg_health_check.erl @@ -2,10 +2,11 @@ -export([global/0]). -export([startup/0]). +-export([skip/0]). -spec global() -> {erl_health:status(), erl_health:details()}. global() -> - ClusterSize = mg_core_union:cluster_size(), + ClusterSize = mg_core_cluster:cluster_size(), ConnectedCount = erlang:length(erlang:nodes()), case is_quorum(ClusterSize, ConnectedCount) of true -> @@ -21,6 +22,10 @@ global() -> startup() -> %% maybe any checks? logger:info("union. node ~p started", [node()]), + skip(). + +-spec skip() -> {erl_health:status(), erl_health:details()}. +skip() -> {passing, []}. %% Internal functions diff --git a/apps/mg/test/mg_tests_SUITE.erl b/apps/mg/test/mg_tests_SUITE.erl index 7785bba..3b5285f 100644 --- a/apps/mg/test/mg_tests_SUITE.erl +++ b/apps/mg/test/mg_tests_SUITE.erl @@ -241,6 +241,7 @@ mg_config(#{endpoint := {IP, Port}}, C) -> storage => {exponential, infinity, 1, 10}, timers => {exponential, infinity, 1, 10} }, + scaling => global_based, % сейчас существуют проблемы, которые не дают включить на постоянной основе эту опцию % (а очень хочется, чтобы проверять работоспособность идемпотентных ретраев) % TODO в будущем нужно это сделать diff --git a/apps/mg_core/src/mg_core_cluster.erl b/apps/mg_core/src/mg_core_cluster.erl new file mode 100644 index 0000000..dd40773 --- /dev/null +++ b/apps/mg_core/src/mg_core_cluster.erl @@ -0,0 +1,334 @@ +-module(mg_core_cluster). + +-behaviour(gen_server). + +-export([start_link/1]). +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-export([child_spec/1]). +-export([cluster_size/0]). +-export([connecting/1]). +-export([get_node/1]). +-export([get_partitions_info/0]). + +-define(SERVER, ?MODULE). +-define(RECONNECT_TIMEOUT, 5000). + +-type discovery_options() :: mg_core_cluster_partitions:discovery_options(). + +-type scaling_type() :: global_based | partition_based. + +-type cluster_options() :: #{ + discovering => discovery_options(), + reconnect_timeout => non_neg_integer(), + scaling => scaling_type(), + %% partitioning required if scaling = partition_based + partitioning => mg_core_cluster_partitions:partitions_options() +}. + +-type partitions_info() :: #{ + partitioning => mg_core_cluster_partitions:partitions_options(), + local_table => mg_core_cluster_partitions:local_partition_table(), + balancing_table => mg_core_cluster_partitions:balancing_table(), + partitions_table => mg_core_cluster_partitions:partitions_table() +}. + +-type state() :: #{ + %% cluster static options + discovering => discovery_options(), + reconnect_timeout => non_neg_integer(), + scaling => scaling_type(), + partitioning => mg_core_cluster_partitions:partitions_options(), + %% dynamic + known_nodes => [node()], + local_table => mg_core_cluster_partitions:local_partition_table(), + balancing_table => mg_core_cluster_partitions:balancing_table(), + partitions_table => mg_core_cluster_partitions:partitions_table() +}. + +-export_type([scaling_type/0]). +-export_type([partitions_info/0]). +-export_type([cluster_options/0]). + +-spec child_spec(cluster_options()) -> [supervisor:child_spec()]. +child_spec(#{discovering := _} = ClusterOpts) -> + [ + #{ + id => ?MODULE, + start => {?MODULE, start_link, [ClusterOpts]} + } + ]; +child_spec(_) -> + % cluster not configured, skip + []. + +-spec cluster_size() -> non_neg_integer(). +cluster_size() -> + case whereis(?MODULE) of + undefined -> + %% for backward compatibility with consul + ReplicaCount = os:getenv("REPLICA_COUNT", "1"), + erlang:list_to_integer(ReplicaCount); + Pid when is_pid(Pid) -> + gen_server:call(Pid, get_cluster_size) + end. + +-spec connecting({mg_core_cluster_partitions:partitions_table(), node()}) -> + {ok, mg_core_cluster_partitions:local_partition_table()}. +connecting(RemoteData) -> + gen_server:call(?MODULE, {connecting, RemoteData}). + +-spec get_node(mg_core_cluster_partitions:balancing_key()) -> {ok, node()}. +get_node(BalancingKey) -> + gen_server:call(?MODULE, {get_node, BalancingKey}). + +-spec get_partitions_info() -> partitions_info(). +get_partitions_info() -> + gen_server:call(?MODULE, get_partitions_info). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== +-spec start_link(cluster_options()) -> {ok, pid()} | {error, term()}. +start_link(ClusterOpts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, ClusterOpts, []). + +-spec init(cluster_options()) -> {ok, state(), {continue, {full_init, cluster_options()}}}. +init(ClusterOpts) -> + logger:info("mg_cluster. init with options: ~p", [ClusterOpts]), + {ok, #{}, {continue, {full_init, ClusterOpts}}}. + +-spec handle_continue({full_init, cluster_options()}, state()) -> {noreply, state()}. +handle_continue( + { + full_init, + #{ + discovering := DiscoveryOpts, + scaling := ScalingType + } = ClusterOpts + }, + _State +) -> + _ = net_kernel:monitor_nodes(true), + {ok, ListNodes} = mg_core_cluster_partitions:discovery(DiscoveryOpts), + LocalTable = mg_core_cluster_partitions:make_local_table(ScalingType), + PartitionsTable = try_connect_all(ListNodes, maps:get(reconnect_timeout, ClusterOpts), LocalTable), + BalancingTable = mg_core_cluster_partitions:make_balancing_table( + ScalingType, + PartitionsTable, + maps:get(partitioning, ClusterOpts, undefined) + ), + { + noreply, + ClusterOpts#{ + known_nodes => ListNodes, + local_table => LocalTable, + partitions_table => PartitionsTable, + balancing_table => BalancingTable + } + }. + +-spec handle_call(term(), {pid(), _}, state()) -> {reply, any(), state()}. +handle_call({get_node, BalancingKey}, _From, State) -> + Response = mg_core_cluster_partitions:get_node(BalancingKey, partitions_info(State)), + {reply, Response, State}; +handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) -> + {reply, erlang:length(ListNodes), State}; +handle_call(get_partitions_info, _From, State) -> + {reply, partitions_info(State), State}; +handle_call( + {connecting, {RemoteTable, _RemoteNode}}, + _From, + #{ + scaling := partition_based, + partitioning := PartitionsOpts, + local_table := LocalTable, + partitions_table := PartitionsTable + } = State +) -> + NewPartitionsTable = mg_core_cluster_partitions:add_partitions(PartitionsTable, RemoteTable), + NewBalancingTable = mg_core_cluster_partitions:make_balancing_table( + partition_based, + NewPartitionsTable, + PartitionsOpts + ), + { + reply, + {ok, LocalTable}, + State#{ + partitions_table => NewPartitionsTable, + balancing_table => NewBalancingTable + } + }; +%% Not partition based cluster, only list nodes updating +handle_call({connecting, {_RemoteTable, _RemoteNode}}, _From, State) -> + { + reply, + {ok, mg_core_cluster_partitions:empty_partitions()}, + State + }. + +-spec handle_cast(term(), state()) -> {noreply, state()}. +handle_cast(_Request, State) -> + {noreply, State}. + +-spec handle_info(term(), state()) -> {noreply, state()}. +handle_info({timeout, _TRef, {reconnect, Node}}, State) -> + NewState = maybe_connect(Node, State), + {noreply, NewState}; +handle_info({nodeup, RemoteNode}, #{discovering := DiscoveryOpts} = State) -> + %% do nothing because rebalance partitions in connecting call + logger:info("mg_cluster. ~p receive nodeup ~p", [node(), RemoteNode]), + {ok, ListNodes} = mg_core_cluster_partitions:discovery(DiscoveryOpts), + {noreply, State#{known_nodes => ListNodes}}; +handle_info( + {nodedown, RemoteNode}, + #{discovering := DiscoveryOpts, reconnect_timeout := Timeout, partitions_table := PartitionsTable} = State +) -> + %% rebalance without node + logger:warning("mg_cluster. ~p receive nodedown ~p", [node(), RemoteNode]), + {ok, ListNodes} = mg_core_cluster_partitions:discovery(DiscoveryOpts), + NewPartitionsTable = mg_core_cluster_partitions:del_partition(RemoteNode, PartitionsTable), + NewState = maybe_rebalance(#{}, State#{known_nodes => ListNodes, partitions_table => NewPartitionsTable}), + _ = erlang:start_timer(Timeout, self(), {reconnect, RemoteNode}), + {noreply, NewState}. + +-spec terminate(_Reason, state()) -> ok. +terminate(_Reason, _State) -> + ok. + +-spec code_change(_OldVsn, state(), _Extra) -> {ok, state()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% cluster functions +-spec connect(node(), non_neg_integer(), mg_core_cluster_partitions:local_partition_table()) -> + {ok, mg_core_cluster_partitions:partitions_table()} | {error, term()}. +connect(Node, ReconnectTimeout, LocalTable) when Node =/= node() -> + case net_adm:ping(Node) of + pong -> + try + erpc:call(Node, ?MODULE, connecting, [{LocalTable, node()}]) + catch + _:_ -> + _ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}), + {error, not_connected} + end; + pang -> + _ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}), + {error, not_connected} + end; +connect(_Node, _ReconnectTimeout, LocalTable) -> + {ok, LocalTable}. + +-spec try_connect_all([node()], non_neg_integer(), mg_core_cluster_partitions:local_partition_table()) -> + mg_core_cluster_partitions:partitions_table(). +try_connect_all(ListNodes, ReconnectTimeout, LocalTable) -> + lists:foldl( + fun(Node, Acc) -> + case connect(Node, ReconnectTimeout, LocalTable) of + {ok, RemoteTable} -> + mg_core_cluster_partitions:add_partitions(Acc, RemoteTable); + _ -> + Acc + end + end, + mg_core_cluster_partitions:empty_partitions(), + ListNodes + ). + +-spec maybe_connect(node(), state()) -> state(). +maybe_connect( + Node, + #{ + discovering := Opts, + local_table := LocalTable, + reconnect_timeout := ReconnectTimeout + } = State +) -> + {ok, ListNodes} = mg_core_cluster_partitions:discovery(Opts), + case lists:member(Node, ListNodes) of + false -> + %% node delete from cluster, do nothing (rebalance was when node down detected) + State#{known_nodes => ListNodes}; + true -> + case connect(Node, ReconnectTimeout, LocalTable) of + {ok, RemoteTable} -> + %% node connected after temporary split or new node added, rebalance with node + maybe_rebalance(RemoteTable, State#{known_nodes => ListNodes}); + _ -> + State#{known_nodes => ListNodes} + end + end. + +-spec maybe_rebalance(mg_core_cluster_partitions:partitions_table(), state()) -> state(). +maybe_rebalance( + RemoteTable, + #{ + scaling := partition_based, + partitioning := PartitionsOpts, + partitions_table := PartitionsTable + } = State +) -> + NewPartitionsTable = mg_core_cluster_partitions:add_partitions(PartitionsTable, RemoteTable), + NewBalancingTable = mg_core_cluster_partitions:make_balancing_table( + partition_based, + NewPartitionsTable, + PartitionsOpts + ), + State#{partitions_table => NewPartitionsTable, balancing_table => NewBalancingTable}; +maybe_rebalance(_, State) -> + State. + +-spec partitions_info(state()) -> partitions_info(). +partitions_info(State) -> + maps:with([partitioning, partitions_table, balancing_table, local_table], State). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-define(CLUSTER_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"localhost">>, + <<"sname">> => <<"test_node">> + }, + scaling => partition_based, + partitioning => #{ + capacity => 3, + max_hash => 4095 + }, + reconnect_timeout => ?RECONNECT_TIMEOUT +}). + +-spec test() -> _. + +-spec child_spec_test() -> _. +child_spec_test() -> + EmptyChildSpec = mg_core_cluster:child_spec(#{}), + ?assertEqual([], EmptyChildSpec), + ExpectedSpec = [ + #{ + id => mg_core_cluster, + start => { + mg_core_cluster, + start_link, + [?CLUSTER_OPTS] + } + } + ], + ChildSpec = mg_core_cluster:child_spec(?CLUSTER_OPTS), + ?assertEqual(ExpectedSpec, ChildSpec). + +-endif. diff --git a/apps/mg_core/src/mg_core_cluster_partitions.erl b/apps/mg_core/src/mg_core_cluster_partitions.erl new file mode 100644 index 0000000..1ed6aa7 --- /dev/null +++ b/apps/mg_core/src/mg_core_cluster_partitions.erl @@ -0,0 +1,153 @@ +-module(mg_core_cluster_partitions). + +-type discovery_options() :: #{ + %% #{<<"domain_name">> => <<"machinegun-ha-headless">>,<<"sname">> => <<"machinegun">>} + binary() => binary() +}. +-type balancing_key() :: term(). +-type hash_range() :: {non_neg_integer(), non_neg_integer()}. +-type partition() :: non_neg_integer(). +-type partitions_options() :: #{ + capacity => non_neg_integer(), + max_hash => non_neg_integer() +}. +-type balancing_table() :: #{ + hash_range() => partition() +}. +-type partitions_table() :: #{ + partition() => node() +}. +%% local and remote tables contains single pair: self partition and self node +-type local_partition_table() :: partitions_table(). +-type remote_partition_table() :: partitions_table(). + +-export_type([discovery_options/0]). +-export_type([partitions_options/0]). +-export_type([balancing_key/0]). +-export_type([partition/0]). +-export_type([balancing_table/0]). +-export_type([partitions_table/0]). +-export_type([local_partition_table/0]). +-export_type([remote_partition_table/0]). + +%% API +-export([discovery/1]). +-export([make_local_table/1]). +-export([make_balancing_table/3]). +-export([add_partitions/2]). +-export([del_partition/2]). +-export([empty_partitions/0]). +-export([get_node/2]). +-export([is_local_partition/2]). + +-spec discovery(discovery_options()) -> {ok, [node()]}. +discovery(#{<<"domain_name">> := DomainName, <<"sname">> := Sname}) -> + case get_addrs(unicode:characters_to_list(DomainName)) of + {ok, ListAddrs} -> + logger:info("mg_cluster. resolve ~p with result: ~p", [DomainName, ListAddrs]), + {ok, addrs_to_nodes(lists:uniq(ListAddrs), Sname)}; + Error -> + error({resolve_error, Error}) + end. + +-spec make_local_table(mg_core_cluster:scaling_type()) -> local_partition_table(). +make_local_table(global_based) -> + #{}; +make_local_table(partition_based) -> + {ok, Hostname} = inet:gethostname(), + {ok, HostIndex} = host_to_index(Hostname), + #{HostIndex => node()}. + +-spec make_balancing_table(mg_core_cluster:scaling_type(), partitions_table(), partitions_options() | undefined) -> + balancing_table(). +make_balancing_table(global_based, _PartitionsTable, _) -> + #{}; +make_balancing_table(partition_based, PartitionsTable, #{capacity := Capacity, max_hash := MaxHash}) -> + ListPartitions = maps:keys(PartitionsTable), + mg_core_dirange:get_ranges(MaxHash, Capacity, ListPartitions). + +-spec get_node(balancing_key(), mg_core_cluster:partitions_info()) -> {ok, node()}. +get_node(BalancingKey, PartitionsInfo) -> + #{ + partitions_table := PartitionsTable, + balancing_table := BalancingTable, + partitioning := #{max_hash := MaxHash} + } = PartitionsInfo, + {ok, Index} = mg_core_dirange:find(erlang:phash2(BalancingKey, MaxHash), BalancingTable), + Node = maps:get(Index, PartitionsTable), + {ok, Node}. + +-spec is_local_partition(balancing_key(), mg_core_cluster:partitions_info()) -> boolean(). +is_local_partition(BalancingKey, PartitionsInfo) -> + #{ + local_table := LocalTable, + balancing_table := BalancingTable, + partitioning := #{max_hash := MaxHash} + } = PartitionsInfo, + [LocalPartition] = maps:keys(LocalTable), + {ok, LocalPartition} =:= mg_core_dirange:find(erlang:phash2(BalancingKey, MaxHash), BalancingTable). + +-spec add_partitions(partitions_table(), partitions_table()) -> partitions_table(). +add_partitions(KnownPartitions, NewPartitions) -> + maps:merge(KnownPartitions, NewPartitions). + +-spec del_partition(node(), partitions_table()) -> partitions_table(). +del_partition(Node, PartitionsTable) -> + maps:filter(fun(_Partition, NodeName) -> NodeName =/= Node end, PartitionsTable). + +-spec empty_partitions() -> partitions_table(). +empty_partitions() -> + #{}. + +% Internal functions + +-spec get_addrs(inet:hostname()) -> {ok, [inet:ip_address()]} | {error, _}. +get_addrs(DomainName) -> + case inet:getaddrs(DomainName, inet) of + {ok, _} = Ok -> Ok; + _ -> inet:getaddrs(DomainName, inet6) + end. + +-spec addrs_to_nodes([inet:ip_address()], binary()) -> [node()]. +addrs_to_nodes(ListAddrs, Sname) -> + NodeName = unicode:characters_to_list(Sname), + lists:foldl( + fun(Addr, Acc) -> + [erlang:list_to_atom(NodeName ++ "@" ++ inet:ntoa(Addr)) | Acc] + end, + [], + ListAddrs + ). + +-spec host_to_index(string()) -> {ok, non_neg_integer()} | error. +host_to_index(MaybeFqdn) -> + [Host | _] = string:split(MaybeFqdn, ".", all), + try + [_, IndexStr] = string:split(Host, "-", trailing), + {ok, erlang:list_to_integer(IndexStr)} + catch + _:_ -> + error + end. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-spec test() -> _. + +-spec get_addrs_test() -> _. +get_addrs_test() -> + {ok, [{127, 0, 0, 1} | _]} = get_addrs("localhost"), + ok. + +-spec addrs_to_nodes_test() -> _. +addrs_to_nodes_test() -> + ?assertEqual(['foo@127.0.0.1'], addrs_to_nodes([{127, 0, 0, 1}], <<"foo">>)). + +-spec host_to_index_test() -> _. +host_to_index_test() -> + ?assertEqual({ok, 0}, host_to_index("mg-0")), + ?assertEqual({ok, 1}, host_to_index("mg-1.example.com")), + ?assertEqual(error, host_to_index("ya.ru")). + +-endif. diff --git a/apps/mg_core/src/mg_core_dirange.erl b/apps/mg_core/src/mg_core_dirange.erl index 46b04f2..9202df4 100644 --- a/apps/mg_core/src/mg_core_dirange.erl +++ b/apps/mg_core/src/mg_core_dirange.erl @@ -41,6 +41,11 @@ -export([from/1]). -export([to/1]). +-export([ + get_ranges/3, + find/2 +]). + %% Directed range over integers -type dirange(_T) :: nonempty_dirange(_T) | undefined. -type direction() :: -1 | +1. @@ -48,6 +53,14 @@ % Non-empty, unambiguously oriented directed range [from..to]. {_T :: integer(), _T :: integer(), direction()}. +-type max_value() :: non_neg_integer(). +-type capacity() :: non_neg_integer(). +-type section_number() :: non_neg_integer(). +-type alive_sections() :: [section_number()]. +-type min_range_value() :: non_neg_integer(). +-type max_range_value() :: non_neg_integer(). +-type range_map() :: #{{min_range_value(), max_range_value()} => section_number()}. + %% -spec empty() -> dirange(_). @@ -204,3 +217,132 @@ to(undefined) -> undefined; to({_, B, _}) -> B. + +-spec get_ranges(max_value(), capacity(), alive_sections()) -> range_map(). +get_ranges(MaxValue, Capacity, AliveList) -> + AliveSize = erlang:length(AliveList), + AliveListSorted = lists:sort(AliveList), + FullList = lists:seq(0, Capacity - 1), + DeadList = lists:filter(fun(E) -> not lists:member(E, AliveList) end, FullList), + BaseRangeMap = distribute({0, MaxValue}, Capacity, FullList), + redistribute(BaseRangeMap, AliveSize, AliveListSorted, DeadList). + +-spec find(non_neg_integer(), range_map()) -> {ok, section_number()} | none. +find(Value, RangeMap) -> + Iterator = maps:iterator(RangeMap), + do_find(maps:next(Iterator), Value). + +%% Internal functions + +-spec do_find(none | {{min_range_value(), max_range_value()}, section_number(), maps:iterator()}, non_neg_integer()) -> + {ok, section_number()} | none. +do_find(none, _) -> + none; +do_find({Range, Num, Iterator}, Value) -> + case in_range(Value, Range) of + true -> {ok, Num}; + false -> do_find(maps:next(Iterator), Value) + end. + +-spec in_range(non_neg_integer(), {min_range_value(), max_range_value()}) -> boolean(). +in_range(Value, {Min, Max}) when Value >= Min andalso Value =< Max -> true; +in_range(_, _) -> false. + +-spec distribute({min_range_value(), max_range_value()}, non_neg_integer(), [section_number()]) -> + range_map(). +distribute(Range, Size, ListSorted) -> + distribute(Range, Size, ListSorted, #{}). + +-spec distribute({min_range_value(), max_range_value()}, non_neg_integer(), [section_number()], range_map()) -> + range_map(). +distribute({Min, Max}, Size, ListSorted, Acc) -> + Delta = not_zero(((Max - Min) div Size) - 1), + SizeFromZero = Size - 1, + {_, Result} = lists:foldl( + fun + (Num, {StartPos, Map}) when Num =:= SizeFromZero -> + %% because lists indexed from 1 + {Max, Map#{{StartPos, Max} => lists:nth(Num + 1, ListSorted)}}; + (Num, {StartPos, Map}) -> + MaxVal = StartPos + Delta, + {MaxVal + 1, Map#{{StartPos, MaxVal} => lists:nth(Num + 1, ListSorted)}} + end, + {Min, Acc}, + lists:seq(0, SizeFromZero) + ), + Result. + +-spec redistribute(range_map(), non_neg_integer(), [section_number()], [section_number()]) -> range_map(). +redistribute(BaseRangeMap, AliveSize, AliveListSorted, DeadList) -> + maps:fold( + fun(Range, RangeNum, Acc) -> + case lists:member(RangeNum, DeadList) of + true -> + distribute(Range, AliveSize, AliveListSorted, Acc); + false -> + Acc#{Range => RangeNum} + end + end, + #{}, + BaseRangeMap + ). + +-spec not_zero(non_neg_integer()) -> non_neg_integer(). +not_zero(0) -> 1; +not_zero(Value) -> Value. + +%% Tests + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-spec test() -> _. + +-spec without_dead_test() -> _. +without_dead_test() -> + ?assertEqual( + #{{0, 3} => 0, {4, 7} => 1, {8, 11} => 2, {12, 16} => 3}, + get_ranges(16, 4, [0, 1, 2, 3]) + ), + ?assertEqual( + #{{0, 3} => 0, {4, 7} => 1, {8, 11} => 2, {12, 17} => 3}, + get_ranges(17, 4, [0, 1, 2, 3]) + ), + ?assertEqual( + #{{0, 4} => 0, {5, 9} => 1, {10, 14} => 2, {15, 21} => 3}, + get_ranges(21, 4, [0, 1, 2, 3]) + ). + +-spec with_dead_test() -> _. +with_dead_test() -> + ?assertEqual( + #{ + {0, 3} => 0, + {4, 5} => 0, + {6, 7} => 2, + {8, 7} => 3, + {8, 11} => 2, + {12, 16} => 3 + }, + get_ranges(16, 4, [0, 2, 3]) + ), + ?assertEqual( + #{ + {0, 3} => 0, + {4, 5} => 0, + {6, 7} => 2, + {8, 11} => 2, + {12, 13} => 0, + {14, 16} => 2 + }, + get_ranges(16, 4, [0, 2]) + ). + +-spec find_test() -> _. +find_test() -> + RangeMap = get_ranges(16, 4, [0, 2]), + ?assertEqual({ok, 0}, find(5, RangeMap)), + ?assertEqual({ok, 2}, find(10, RangeMap)), + ?assertEqual(none, find(100, RangeMap)). + +-endif. diff --git a/apps/mg_core/src/mg_core_machine.erl b/apps/mg_core/src/mg_core_machine.erl index 3d8e2fc..695af93 100644 --- a/apps/mg_core/src/mg_core_machine.erl +++ b/apps/mg_core/src/mg_core_machine.erl @@ -164,6 +164,7 @@ -type options() :: #{ namespace := mg_core:ns(), pulse := mg_core_pulse:handler(), + scaling := mg_core_cluster:scaling_type(), storage => storage_options(), notification => mg_core_notification:options(), processor => mg_core_utils:mod_opts(), @@ -1416,9 +1417,9 @@ manager_options(Options = #{namespace := NS, worker := ManagerOptions, pulse := }. -spec storage_options(options()) -> mg_core_storage:options(). -storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler}) -> +storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler, scaling := Scaling}) -> {Mod, Options} = mg_core_utils:separate_mod_opts(StorageOptions, #{}), - {Mod, Options#{name => {NS, ?MODULE, machines}, pulse => Handler}}. + {Mod, Options#{name => {NS, ?MODULE, machines}, pulse => Handler, scaling => Scaling}}. -spec notification_options(options()) -> mg_core_notification:options(). notification_options(#{notification := NotificationOptions}) -> @@ -1501,7 +1502,8 @@ scheduler_options(HandlerMod, Options, HandlerOptions, Config) -> max_scan_limit => maps:get(max_scan_limit, Config, undefined), scan_ahead => maps:get(scan_ahead, Config, undefined), task_handler => Handler, - pulse => Pulse + pulse => Pulse, + scaling => maps:get(scaling, Options, global_based) }). -spec scheduler_cutoff(scheduler_opt()) -> seconds(). diff --git a/apps/mg_core/src/mg_core_notification.erl b/apps/mg_core/src/mg_core_notification.erl index d961bc7..f3bfa4b 100644 --- a/apps/mg_core/src/mg_core_notification.erl +++ b/apps/mg_core/src/mg_core_notification.erl @@ -18,7 +18,8 @@ -type options() :: #{ namespace := mg_core:ns(), pulse := mg_core_pulse:handler(), - storage := storage_options() + storage := storage_options(), + scaling => mg_core_cluster:scaling_type() }. -export_type([id/0]). @@ -123,6 +124,7 @@ data_to_opaque(#{ %% -spec storage_options(options()) -> mg_core_storage:options(). -storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler}) -> +storage_options(#{namespace := NS, storage := StorageOptions, pulse := Handler} = Opts) -> + Scaling = maps:get(scaling, Opts, global_based), {Mod, Options} = mg_core_utils:separate_mod_opts(StorageOptions, #{}), - {Mod, Options#{name => {NS, ?MODULE, notifications}, pulse => Handler}}. + {Mod, Options#{name => {NS, ?MODULE, notifications}, pulse => Handler, scaling => Scaling}}. diff --git a/apps/mg_core/src/mg_core_queue_notifications.erl b/apps/mg_core/src/mg_core_queue_notifications.erl index 2359e96..04270d2 100644 --- a/apps/mg_core/src/mg_core_queue_notifications.erl +++ b/apps/mg_core/src/mg_core_queue_notifications.erl @@ -44,7 +44,8 @@ rescan_delay => milliseconds(), scan_handicap => seconds(), scan_cutoff => seconds(), - reschedule_time => seconds() + reschedule_time => seconds(), + scaling => mg_core_cluster:scaling_type() }. -record(state, { @@ -176,8 +177,9 @@ machine_options(#{machine := MachineOptions}) -> MachineOptions. -spec notification_options(options()) -> mg_core_notification:options(). -notification_options(#{notification := NotificationOptions}) -> - NotificationOptions. +notification_options(#{notification := NotificationOptions, machine := Machine}) -> + Scaling = maps:get(scaling, Machine, global_based), + NotificationOptions#{scaling => Scaling}. -spec create_task(options(), mg_core_notification:id(), target_time()) -> task(). create_task(Options, NotificationID, Timestamp) -> diff --git a/apps/mg_core/src/mg_core_queue_scanner.erl b/apps/mg_core/src/mg_core_queue_scanner.erl index 6028d2d..e411422 100644 --- a/apps/mg_core/src/mg_core_queue_scanner.erl +++ b/apps/mg_core/src/mg_core_queue_scanner.erl @@ -40,7 +40,8 @@ scan_ahead => scan_ahead(), retry_scan_delay => scan_delay(), squad_opts => mg_core_gen_squad:opts(), - pulse => mg_core_pulse:handler() + pulse => mg_core_pulse:handler(), + scaling => mg_core_cluster:scaling_type() }. -export_type([options/0]). @@ -152,7 +153,8 @@ where_is(SchedulerID) -> scan_ahead :: scan_ahead(), retry_delay :: scan_delay(), timer :: reference() | undefined, - pulse :: mg_core_pulse:handler() | undefined + pulse :: mg_core_pulse:handler() | undefined, + scaling :: mg_core_cluster:scaling_type() }). -type st() :: #st{}. @@ -168,10 +170,13 @@ init({SchedulerID, Options}) -> max_limit = maps:get(max_scan_limit, Options, ?DEFAULT_MAX_LIMIT), scan_ahead = maps:get(scan_ahead, Options, ?DEFAULT_SCAN_AHEAD), retry_delay = maps:get(retry_scan_delay, Options, ?DEFAULT_RETRY_SCAN_DELAY), - pulse = maps:get(pulse, Options, undefined) + pulse = maps:get(pulse, Options, undefined), + scaling = maps:get(scaling, Options, global_based) }}. -spec discover(st()) -> {ok, [pid()], st()}. +discover(St = #st{scaling = partition_based}) -> + {ok, [], St}; discover(St = #st{scheduler_id = SchedulerID}) -> Nodes = erlang:nodes(), Pids = multicall(Nodes, ?MODULE, where_is, [SchedulerID], ?DISCOVER_TIMEOUT), diff --git a/apps/mg_core/src/mg_core_scheduler_sup.erl b/apps/mg_core/src/mg_core_scheduler_sup.erl index 11fc809..cc8549a 100644 --- a/apps/mg_core/src/mg_core_scheduler_sup.erl +++ b/apps/mg_core/src/mg_core_scheduler_sup.erl @@ -30,6 +30,7 @@ scan_ahead => mg_core_queue_scanner:scan_ahead(), retry_scan_delay => mg_core_queue_scanner:scan_delay(), squad_opts => mg_core_gen_squad:opts(), + scaling => mg_core_cluster:scaling_type(), % workers task_handler := mg_core_utils:mod_opts(), % common @@ -59,7 +60,7 @@ start_link(SchedulerID, Options) -> Options ), ScannerOptions = maps:with( - [queue_handler, max_scan_limit, scan_ahead, retry_scan_delay, squad_opts, pulse], + [queue_handler, max_scan_limit, scan_ahead, retry_scan_delay, squad_opts, pulse, scaling], Options ), WorkerOptions = maps:with( diff --git a/apps/mg_core/src/mg_core_storage.erl b/apps/mg_core/src/mg_core_storage.erl index e84459e..1571173 100644 --- a/apps/mg_core/src/mg_core_storage.erl +++ b/apps/mg_core/src/mg_core_storage.erl @@ -175,6 +175,8 @@ get(Options, Key) -> do_request(Options, {get, Key}). -spec search(options(), index_query()) -> search_result(). +search({_, #{scaling := partition_based, name := Name}} = Options, Query) -> + filter_by_partition(Name, do_request(Options, {search, Query})); search(Options, Query) -> do_request(Options, {search, Query}). @@ -283,6 +285,28 @@ sidecar_child_spec(Options, ChildID) -> undefined end. +-spec filter_by_partition(name(), search_result()) -> search_result(). +filter_by_partition({NS, _, _}, Data) when is_list(Data) -> + PartitionsInfo = mg_core_cluster:get_partitions_info(), + lists:filter( + fun + ({_Index, Key}) -> do_filter_by_partition({NS, Key}, PartitionsInfo); + (Key) -> do_filter_by_partition({NS, Key}, PartitionsInfo) + end, + Data + ); +filter_by_partition({_NS, _, _} = Name, {Data, Continuation}) -> + {filter_by_partition(Name, Data), Continuation}; +filter_by_partition(_Name, Data) -> + Data. + +-spec do_filter_by_partition( + mg_core_cluster_partitions:balancing_key(), + mg_core_cluster:partitions_info() +) -> boolean(). +do_filter_by_partition(BalancingKey, PartitionsInfo) -> + mg_core_cluster_partitions:is_local_partition(BalancingKey, PartitionsInfo). + %% %% logging %% diff --git a/apps/mg_core/src/mg_core_union.erl b/apps/mg_core/src/mg_core_union.erl deleted file mode 100644 index c09c97a..0000000 --- a/apps/mg_core/src/mg_core_union.erl +++ /dev/null @@ -1,246 +0,0 @@ --module(mg_core_union). - --behaviour(gen_server). - --export([start_link/1]). --export([ - init/1, - handle_continue/2, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --export([child_spec/1]). --export([discovery/1]). --export([cluster_size/0]). - --ifdef(TEST). --export([set_state/1]). --endif. - --define(SERVER, ?MODULE). --define(RECONNECT_TIMEOUT, 5000). - --type discovery_options() :: #{ - module := module(), - %% options is module specific structure - options := term() -}. --type dns_discovery_options() :: #{ - %% #{<<"domain_name">> => <<"machinegun-ha-headless">>,<<"sname">> => <<"machinegun">>} - binary() => binary() -}. --type cluster_options() :: #{ - discovery => discovery_options(), - reconnect_timeout => non_neg_integer() -}. --type state() :: #{ - known_nodes => [node()], - discovery => discovery_options(), - reconnect_timeout => non_neg_integer() -}. - -%% discovery behaviour callback --callback discovery(dns_discovery_options()) -> {ok, [node()]}. - --spec child_spec(cluster_options()) -> [supervisor:child_spec()]. -child_spec(#{discovery := _} = ClusterOpts) -> - [ - #{ - id => ?MODULE, - start => {?MODULE, start_link, [ClusterOpts]} - } - ]; -child_spec(_) -> - % cluster not configured, skip - []. - --spec discovery(dns_discovery_options()) -> {ok, [node()]}. -discovery(#{<<"domain_name">> := DomainName, <<"sname">> := Sname}) -> - case get_addrs(unicode:characters_to_list(DomainName)) of - {ok, ListAddrs} -> - logger:info("union. resolve ~p with result: ~p", [DomainName, ListAddrs]), - {ok, addrs_to_nodes(lists:uniq(ListAddrs), Sname)}; - Error -> - error({resolve_error, Error}) - end. - --ifdef(TEST). --spec set_state(state()) -> ok. -set_state(NewState) -> - gen_server:call(?MODULE, {set_state, NewState}). --endif. - --spec cluster_size() -> non_neg_integer(). -cluster_size() -> - case whereis(?MODULE) of - undefined -> - ReplicaCount = os:getenv("REPLICA_COUNT", "1"), - erlang:list_to_integer(ReplicaCount); - Pid when is_pid(Pid) -> - gen_server:call(Pid, get_cluster_size) - end. - -%%%=================================================================== -%%% Spawning and gen_server implementation -%%%=================================================================== --spec start_link(cluster_options()) -> {ok, pid()} | {error, term()}. -start_link(ClusterOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, ClusterOpts, []). - --spec init(cluster_options()) -> {ok, state(), {continue, {full_init, cluster_options()}}}. -init(ClusterOpts) -> - logger:info("union. init with options: ~p", [ClusterOpts]), - {ok, #{}, {continue, {full_init, ClusterOpts}}}. - --spec handle_continue({full_init, cluster_options()}, state()) -> {noreply, state()}. -handle_continue({full_init, #{discovery := #{module := Mod, options := Opts}} = ClusterOpts}, _State) -> - _ = net_kernel:monitor_nodes(true), - {ok, ListNodes} = Mod:discovery(Opts), - _ = try_connect_all(ListNodes, maps:get(reconnect_timeout, ClusterOpts)), - {noreply, ClusterOpts#{known_nodes => ListNodes}}. - --spec handle_call(term(), {pid(), _}, state()) -> {reply, any(), state()}. --ifdef(TEST). -handle_call({set_state, NewState}, _From, _State) -> - {reply, ok, NewState}; -handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) -> - {reply, erlang:length(ListNodes), State}. --else. -handle_call(get_cluster_size, _From, #{known_nodes := ListNodes} = State) -> - {reply, erlang:length(ListNodes), State}. --endif. - --spec handle_cast(term(), state()) -> {noreply, state()}. -handle_cast(_Request, State) -> - {noreply, State}. - --spec handle_info(term(), state()) -> {noreply, state()}. -handle_info({timeout, _TRef, {reconnect, Node}}, State) -> - ListNodes = maybe_connect(Node, State), - {noreply, State#{known_nodes => ListNodes}}; -handle_info({nodeup, RemoteNode}, #{known_nodes := ListNodes} = State) -> - logger:info("union. ~p receive nodeup ~p", [node(), RemoteNode]), - NewState = - case lists:member(RemoteNode, ListNodes) of - true -> - %% well known node connected, do nothing - State; - false -> - %% new node connected, need update list nodes - #{discovery := #{module := Mod, options := Opts}, reconnect_timeout := Timeout} = State, - {ok, NewListNodes} = Mod:discovery(Opts), - _ = try_connect_all(NewListNodes, Timeout), - State#{known_nodes => NewListNodes} - end, - {noreply, NewState}; -handle_info({nodedown, RemoteNode}, #{reconnect_timeout := Timeout} = State) -> - logger:warning("union. ~p receive nodedown ~p", [node(), RemoteNode]), - _ = erlang:start_timer(Timeout, self(), {reconnect, RemoteNode}), - {noreply, State}. - --spec terminate(_Reason, state()) -> ok. -terminate(_Reason, _State) -> - ok. - --spec code_change(_OldVsn, state(), _Extra) -> {ok, state()}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% cluster functions --spec connect(node(), non_neg_integer()) -> ok | error. -connect(Node, ReconnectTimeout) -> - case net_adm:ping(Node) of - pong -> - ok; - _ -> - _ = erlang:start_timer(ReconnectTimeout, self(), {reconnect, Node}), - error - end. - --spec try_connect_all([node()], non_neg_integer()) -> ok. -try_connect_all(ListNodes, ReconnectTimeout) -> - _ = lists:foreach(fun(Node) -> connect(Node, ReconnectTimeout) end, ListNodes). - --spec maybe_connect(node(), state()) -> [node()]. -maybe_connect(Node, #{discovery := #{module := Mod, options := Opts}, reconnect_timeout := Timeout}) -> - {ok, ListNodes} = Mod:discovery(Opts), - case lists:member(Node, ListNodes) of - false -> - %% node deleted from cluster, do nothing - skip; - true -> - connect(Node, Timeout) - end, - ListNodes. - -%% discovery functions --spec get_addrs(inet:hostname()) -> {ok, [inet:ip_address()]} | {error, _}. -get_addrs(DomainName) -> - case inet:getaddrs(DomainName, inet) of - {ok, _} = Ok -> Ok; - _ -> inet:getaddrs(DomainName, inet6) - end. - --spec addrs_to_nodes([inet:ip_address()], binary()) -> [node()]. -addrs_to_nodes(ListAddrs, Sname) -> - NodeName = unicode:characters_to_list(Sname), - lists:foldl( - fun(Addr, Acc) -> - [erlang:list_to_atom(NodeName ++ "@" ++ inet:ntoa(Addr)) | Acc] - end, - [], - ListAddrs - ). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - --define(CLUSTER_OPTS, #{ - discovery => #{ - module => mg_core_union, - options => #{ - <<"domain_name">> => <<"localhost">>, - <<"sname">> => <<"test_node">> - } - }, - reconnect_timeout => ?RECONNECT_TIMEOUT -}). - --spec test() -> _. - --spec connect_error_test() -> _. -connect_error_test() -> - ?assertEqual(error, connect('foo@127.0.0.1', 3000)). - --spec child_spec_test() -> _. -child_spec_test() -> - EmptyChildSpec = mg_core_union:child_spec(#{}), - ?assertEqual([], EmptyChildSpec), - ExpectedSpec = [ - #{ - id => mg_core_union, - start => { - mg_core_union, - start_link, - [?CLUSTER_OPTS] - } - } - ], - ChildSpec = mg_core_union:child_spec(?CLUSTER_OPTS), - ?assertEqual(ExpectedSpec, ChildSpec). - --spec for_full_cover_test() -> _. -for_full_cover_test() -> - ?assertEqual({noreply, #{}}, handle_cast([], #{})), - ?assertEqual(ok, terminate(term, #{})), - ?assertEqual({ok, #{}}, code_change(old, #{}, extra)). - --endif. diff --git a/apps/mg_core/test/mg_core_cluster_SUITE.erl b/apps/mg_core/test/mg_core_cluster_SUITE.erl new file mode 100644 index 0000000..8f56a48 --- /dev/null +++ b/apps/mg_core/test/mg_core_cluster_SUITE.erl @@ -0,0 +1,337 @@ +-module(mg_core_cluster_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%% API +-export([ + init_per_suite/1, + end_per_suite/1, + all/0, + groups/0 +]). + +-define(RECONNECT_TIMEOUT, 2000). +-define(CLUSTER_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"machinegun-ha-headless">>, + <<"sname">> => <<"mg">> + }, + scaling => partition_based, + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + reconnect_timeout => ?RECONNECT_TIMEOUT +}). + +-export([base_test/1]). +-export([reconnect_rebalance_test/1]). +-export([connecting_rebalance_test/1]). +-export([double_connecting_test/1]). +-export([deleted_node_down_test/1]). + +-type config() :: [{atom(), term()}]. +-type test_case_name() :: atom(). +-type group_name() :: atom(). +-type test_result() :: any() | no_return(). + +-define(PARTITIONS_INFO_WITH_PEER(Local, Remote), #{ + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + balancing_table => #{ + {0, 818} => 0, + {819, 1637} => 1, + {1638, 2046} => 0, + {2047, 2456} => 1, + {2457, 2865} => 0, + {2866, 3275} => 1, + {3276, 3684} => 0, + {3685, 4095} => 1 + }, + local_table => #{ + 0 => Local + }, + partitions_table => #{ + 0 => Local, + 1 => Remote + } +}). +-define(PARTITIONS_INFO_WO_PEER(Local), #{ + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + balancing_table => #{ + {0, 818} => 0, + {819, 1637} => 0, + {1638, 2456} => 0, + {2457, 3275} => 0, + {3276, 4095} => 0 + }, + local_table => #{ + 0 => Local + }, + partitions_table => #{ + 0 => Local + } +}). +%% erlang:phash2({<<"Namespace">>, <<"ID">>}, 4095) = 1075 +-define(KEY, {<<"Namespace">>, <<"ID">>}). + +-define(ERLANG_TEST_HOSTS, [ + "mg-0", + "mg-1", + "mg-2", + "mg-3", + "mg-4" +]). + +-define(HOSTS_TEMPLATE, << + "127.0.0.1 localhost\n", + "::1 localhost ip6-localhost ip6-loopback\n", + "fe00::0 ip6-localnet\n", + "ff00::0 ip6-mcastprefix\n", + "ff02::1 ip6-allnodes\n", + "ff02::2 ip6-allrouters\n" +>>). + +-define(LOCAL_NODE(Config), begin + {local_node, LocalNode} = lists:keyfind(local_node, 1, Config), + LocalNode +end). + +-define(REMOTE_NODE(Config), begin + {remote_node, RemoteNode} = lists:keyfind(remote_node, 1, Config), + RemoteNode +end). + +-spec init_per_suite(_) -> _. +init_per_suite(Config) -> + HostsTable = lists:foldl( + fun(Host, Acc) -> + {ok, Addr} = inet:getaddr(Host, inet), + Acc#{unicode:characters_to_binary(Host) => unicode:characters_to_binary(inet:ntoa(Addr))} + end, + #{}, + ?ERLANG_TEST_HOSTS + ), + _ = prepare_cluster(HostsTable, [<<"mg-0">>, <<"mg-1">>]), + _ = instance_up(<<"mg-1">>), + LocalAddr = maps:get(<<"mg-0">>, HostsTable), + LocalNode = erlang:binary_to_atom(<<"mg@", LocalAddr/binary>>), + RemoteAddr = maps:get(<<"mg-1">>, HostsTable), + RemoteNode = erlang:binary_to_atom(<<"mg@", RemoteAddr/binary>>), + _ = await_peer(RemoteNode, 5), + [ + {local_node, LocalNode}, + {remote_node, RemoteNode}, + {hosts_table, HostsTable} + | Config + ]. + +-spec end_per_suite(_) -> _. +end_per_suite(_Config) -> + _ = instance_down(<<"mg-1">>), + ok. + +-spec test() -> _. + +-spec all() -> [{group, test_case_name()}]. +all() -> + [{group, basic_operations}]. + +-spec groups() -> [{group_name(), list(), [test_case_name()]}]. +groups() -> + [ + {basic_operations, [], [ + base_test, + reconnect_rebalance_test, + connecting_rebalance_test, + double_connecting_test, + deleted_node_down_test + ]} + ]. + +-spec base_test(config()) -> test_result(). +base_test(Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + ?assertEqual(2, mg_core_cluster:cluster_size()), + ?assertEqual({ok, ?REMOTE_NODE(Config)}, mg_core_cluster:get_node(?KEY)), + ?assertEqual( + false, + mg_core_cluster_partitions:is_local_partition( + ?KEY, + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)) + ) + ), + ?assertEqual( + true, + mg_core_cluster_partitions:is_local_partition( + ?KEY, + ?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config)) + ) + ), + exit(Pid, normal). + +-spec reconnect_rebalance_test(config()) -> test_result(). +reconnect_rebalance_test(Config) -> + %% node_down - rebalance - reconnect by timer - rebalance + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + Pid ! {nodedown, ?REMOTE_NODE(Config)}, + ?assertEqual(?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config)), mg_core_cluster:get_partitions_info()), + ?assertEqual({ok, node()}, mg_core_cluster:get_node(?KEY)), + ?assertEqual( + true, + mg_core_cluster_partitions:is_local_partition(?KEY, ?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config))) + ), + + %% wait reconnecting + timer:sleep(?RECONNECT_TIMEOUT + 10), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + ?assertEqual({ok, ?REMOTE_NODE(Config)}, mg_core_cluster:get_node(?KEY)), + ?assertEqual( + false, + mg_core_cluster_partitions:is_local_partition( + ?KEY, + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)) + ) + ), + exit(Pid, normal). + +-spec connecting_rebalance_test(config()) -> test_result(). +connecting_rebalance_test(Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + Pid ! {nodedown, ?REMOTE_NODE(Config)}, + ?assertEqual(?PARTITIONS_INFO_WO_PEER(?LOCAL_NODE(Config)), mg_core_cluster:get_partitions_info()), + ?assertEqual({ok, node()}, mg_core_cluster:get_node(?KEY)), + + %% force connecting + ?assertEqual( + {ok, #{0 => node()}}, + mg_core_cluster:connecting({#{1 => ?REMOTE_NODE(Config)}, ?REMOTE_NODE(Config)}) + ), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + ?assertEqual({ok, ?REMOTE_NODE(Config)}, mg_core_cluster:get_node(?KEY)), + exit(Pid, normal). + +-spec double_connecting_test(config()) -> test_result(). +double_connecting_test(Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + %% double connect + ?assertEqual( + {ok, #{0 => node()}}, + mg_core_cluster:connecting({#{1 => ?REMOTE_NODE(Config)}, ?REMOTE_NODE(Config)}) + ), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + exit(Pid, normal). + +-spec deleted_node_down_test(config()) -> test_result(). +deleted_node_down_test(Config) -> + {ok, Pid} = mg_core_cluster:start_link(?CLUSTER_OPTS), + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + Pid ! {nodedown, 'foo@127.0.0.1'}, + ?assertEqual( + ?PARTITIONS_INFO_WITH_PEER(?LOCAL_NODE(Config), ?REMOTE_NODE(Config)), + mg_core_cluster:get_partitions_info() + ), + exit(Pid, normal). + +%% Internal functions + +-spec prepare_cluster(_, _) -> _. +prepare_cluster(HostsTable, HostsToUp) -> + %% prepare headless emulation records + HeadlessRecords = lists:foldl( + fun(Host, Acc) -> + Address = maps:get(Host, HostsTable), + <> + end, + <<"\n">>, + HostsToUp + ), + + %% prepare hosts files for each node + lists:foreach( + fun(Host) -> + Address = maps:get(Host, HostsTable), + Payload = << + ?HOSTS_TEMPLATE/binary, + Address/binary, + " ", + Host/binary, + "\n", + HeadlessRecords/binary + >>, + Filename = unicode:characters_to_list(Host) ++ "_hosts", + ok = file:write_file(Filename, Payload) + end, + HostsToUp + ), + + %% distribute hosts files + lists:foreach( + fun + (<<"mg-0">>) -> + LocalFile = "mg-0_hosts", + RemoteFile = "/etc/hosts", + Cp = os:find_executable("cp"), + CMD = Cp ++ " -f " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD); + (Host) -> + HostString = unicode:characters_to_list(Host), + LocalFile = HostString ++ "_hosts", + RemoteFile = HostString ++ ":/etc/hosts", + SshPass = os:find_executable("sshpass"), + Scp = os:find_executable("scp"), + CMD = SshPass ++ " -p security " ++ Scp ++ " " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD) + end, + HostsToUp + ). + +-spec instance_up(_) -> _. +instance_up(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/entrypoint.sh", + spawn(fun() -> os:cmd(CMD) end). + +-spec instance_down(_) -> _. +instance_down(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/mg stop", + spawn(fun() -> os:cmd(CMD) end). + +-spec await_peer(_, _) -> _. +await_peer(_RemoteNode, 0) -> + error(peer_not_started); +await_peer(RemoteNode, Attempt) -> + case net_adm:ping(RemoteNode) of + pong -> + ok; + pang -> + timer:sleep(1000), + await_peer(RemoteNode, Attempt - 1) + end. diff --git a/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl b/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl index 05e7d0b..2baef5a 100644 --- a/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl +++ b/apps/mg_core/test/mg_core_continuation_retry_SUITE.erl @@ -141,7 +141,8 @@ automaton_options() -> }, retries => #{ continuation => {intervals, ?TEST_INTERVALS} - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_events_machine_SUITE.erl b/apps/mg_core/test/mg_core_events_machine_SUITE.erl index 1e43fa6..5fa4ca1 100644 --- a/apps/mg_core/test/mg_core_events_machine_SUITE.erl +++ b/apps/mg_core/test/mg_core_events_machine_SUITE.erl @@ -436,6 +436,7 @@ events_machine_options(Base, StorageOptions, ProcessorOptions, NS) -> namespace => NS, processor => {?MODULE, ProcessorOptions}, machines => #{ + scaling => partition_based, namespace => NS, storage => mg_cth:build_storage(NS, Storage), worker => #{ diff --git a/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl b/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl index d241b72..bfa180e 100644 --- a/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl +++ b/apps/mg_core/test/mg_core_events_modernizer_SUITE.erl @@ -202,7 +202,8 @@ events_machine_options(ProcessorOptions, NS) -> pulse => ?MODULE, storage => mg_core_storage_memory }, - pulse => Pulse + pulse => Pulse, + scaling => global_based }, events_storage => mg_cth:build_storage(<>, Storage) }. diff --git a/apps/mg_core/test/mg_core_events_stash_SUITE.erl b/apps/mg_core/test/mg_core_events_stash_SUITE.erl index bd685e2..696d899 100644 --- a/apps/mg_core/test/mg_core_events_stash_SUITE.erl +++ b/apps/mg_core/test/mg_core_events_stash_SUITE.erl @@ -254,7 +254,8 @@ events_machine_options(Options) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }, events_storage => {mg_core_storage_memory, #{ diff --git a/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl b/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl index 8f46a68..6f566dd 100644 --- a/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl +++ b/apps/mg_core/test/mg_core_instant_timer_task_SUITE.erl @@ -216,7 +216,8 @@ automaton_options(NS) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }. -spec automaton_options_wo_shedulers(mg_core:ns()) -> mg_core_machine:options(). @@ -236,7 +237,8 @@ automaton_options_wo_shedulers(NS) -> pulse => ?MODULE, schedulers => #{ % none - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl b/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl index d91407c..305e859 100644 --- a/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl +++ b/apps/mg_core/test/mg_core_internal_events_logging_SUITE.erl @@ -142,7 +142,8 @@ automaton_options(NS) -> schedulers => #{ timers => #{min_scan_delay => 1000}, timers_retries => #{min_scan_delay => 1000} - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> no_return(). diff --git a/apps/mg_core/test/mg_core_interrupted_SUITE.erl b/apps/mg_core/test/mg_core_interrupted_SUITE.erl index 4ebbcbb..7347f25 100644 --- a/apps/mg_core/test/mg_core_interrupted_SUITE.erl +++ b/apps/mg_core/test/mg_core_interrupted_SUITE.erl @@ -179,7 +179,8 @@ automaton_options(NS, StorageName) -> pulse => ?MODULE, schedulers => #{ overseer => Scheduler - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_machine_SUITE.erl b/apps/mg_core/test/mg_core_machine_SUITE.erl index e7f0253..9e75eb9 100644 --- a/apps/mg_core/test/mg_core_machine_SUITE.erl +++ b/apps/mg_core/test/mg_core_machine_SUITE.erl @@ -246,7 +246,8 @@ automaton_options(C) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl b/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl index 94859fc..6514e50 100644 --- a/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl +++ b/apps/mg_core/test/mg_core_machine_full_test_SUITE.erl @@ -139,7 +139,7 @@ check_chain(Options, ID, ReportPid) -> -spec check_chain(mg_core_machine:options(), id(), seq(), [action()], state(), pid()) -> ok. % TODO убрать константы -check_chain(_, ID, 100000, _, _, ReportPid) -> +check_chain(_, ID, 10000, _, _, ReportPid) -> ReportPid ! ?CHAIN_COMPLETE(ID, erlang:monotonic_time()), ok; check_chain(Options, ID, Seq, AllActions, State, ReportPid) -> @@ -317,7 +317,8 @@ automaton_options() -> pulse => ?MODULE, storage => mg_core_storage_memory }, - pulse => ?MODULE + pulse => ?MODULE, + scaling => global_based }. -spec lists_random(list(T)) -> T. diff --git a/apps/mg_core/test/mg_core_machine_notification_SUITE.erl b/apps/mg_core/test/mg_core_machine_notification_SUITE.erl index 3c53981..febdbaa 100644 --- a/apps/mg_core/test/mg_core_machine_notification_SUITE.erl +++ b/apps/mg_core/test/mg_core_machine_notification_SUITE.erl @@ -309,7 +309,8 @@ automaton_options(_C) -> scan_handicap => 1, reschedule_time => 2 } - } + }, + scaling => global_based }. -spec notification_options() -> mg_core_notification:options(). @@ -317,7 +318,8 @@ notification_options() -> #{ namespace => ?NS, pulse => ?MODULE, - storage => mg_core_storage_memory + storage => mg_core_storage_memory, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_timer_retry_SUITE.erl b/apps/mg_core/test/mg_core_timer_retry_SUITE.erl index 82ffb63..58a0942 100644 --- a/apps/mg_core/test/mg_core_timer_retry_SUITE.erl +++ b/apps/mg_core/test/mg_core_timer_retry_SUITE.erl @@ -216,7 +216,8 @@ automaton_options(NS, RetryPolicy) -> timers => Scheduler, timers_retries => Scheduler, overseer => Scheduler - } + }, + scaling => global_based }. -spec handle_beat(_, mg_core_pulse:beat()) -> ok. diff --git a/apps/mg_core/test/mg_core_union_SUITE.erl b/apps/mg_core/test/mg_core_union_SUITE.erl deleted file mode 100644 index 3c8e285..0000000 --- a/apps/mg_core/test/mg_core_union_SUITE.erl +++ /dev/null @@ -1,142 +0,0 @@ --module(mg_core_union_SUITE). - --include_lib("eunit/include/eunit.hrl"). - -%% API --export([ - init_per_suite/1, - end_per_suite/1, - all/0, - groups/0 -]). - --define(RECONNECT_TIMEOUT, 1000). --define(CLUSTER_OPTS, #{ - discovery => #{ - module => mg_core_union, - options => #{ - <<"domain_name">> => <<"localhost">>, - <<"sname">> => <<"test_node">> - } - }, - reconnect_timeout => ?RECONNECT_TIMEOUT -}). - --export([nxdomain_test/1]). --export([start_ok_test/1]). --export([unknown_nodedown_test/1]). --export([exists_nodedown_test/1]). --export([unknown_nodeup_test/1]). --export([exists_nodeup_test/1]). --export([cluster_size_test/1]). - --type config() :: [{atom(), term()}]. --type test_case_name() :: atom(). --type group_name() :: atom(). --type test_result() :: any() | no_return(). - --spec init_per_suite(_) -> _. -init_per_suite(Config) -> - Config. - --spec end_per_suite(_) -> _. -end_per_suite(_Config) -> - ok. - --spec test() -> _. - --spec all() -> [{group, test_case_name()}]. -all() -> - [{group, basic_operations}]. - --spec groups() -> [{group_name(), list(), [test_case_name()]}]. -groups() -> - [ - {discovery, [], [ - nxdomain_test - ]}, - {basic_operations, [], [ - start_ok_test, - unknown_nodedown_test, - exists_nodedown_test, - unknown_nodeup_test, - exists_nodeup_test, - cluster_size_test - ]} - ]. - --spec nxdomain_test(config()) -> test_result(). -nxdomain_test(_Config) -> - ?assertError( - {resolve_error, {error, nxdomain}}, - mg_core_union:discovery(#{<<"domain_name">> => <<"bad_name">>, <<"sname">> => <<"mg">>}) - ). - --spec start_ok_test(config()) -> test_result(). -start_ok_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - State = await_sys_get_state(Pid), - #{known_nodes := ListNodes} = State, - lists:foreach( - fun(Node) -> - ?assertEqual(pong, net_adm:ping(Node)) - end, - ListNodes - ), - exit(Pid, normal). - --spec unknown_nodedown_test(config()) -> test_result(). -unknown_nodedown_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - nodedown_check(Pid, 'foo@127.0.0.1'), - exit(Pid, normal). - --spec exists_nodedown_test(config()) -> test_result(). -exists_nodedown_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - nodedown_check(Pid, node()), - exit(Pid, normal). - --spec unknown_nodeup_test(config()) -> test_result(). -unknown_nodeup_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - State = await_sys_get_state(Pid), - mg_core_union:set_state(State#{known_nodes => []}), - Pid ! {nodeup, node()}, - #{known_nodes := List} = await_sys_get_state(Pid), - ?assertEqual(List, [node()]), - exit(Pid, normal). - --spec exists_nodeup_test(config()) -> test_result(). -exists_nodeup_test(_Config) -> - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - #{known_nodes := List1} = await_sys_get_state(Pid), - ?assertEqual(List1, [node()]), - Pid ! {nodeup, node()}, - #{known_nodes := List2} = await_sys_get_state(Pid), - ?assertEqual(List2, [node()]), - exit(Pid, normal). - --spec cluster_size_test(config()) -> test_result(). -cluster_size_test(_Config) -> - _ = os:putenv("REPLICA_COUNT", "3"), - ?assertEqual(3, mg_core_union:cluster_size()), - {ok, Pid} = mg_core_union:start_link(?CLUSTER_OPTS), - ?assertEqual(1, mg_core_union:cluster_size()), - exit(Pid, normal). - -%% Internal functions --spec nodedown_check(pid(), node()) -> _. -nodedown_check(Pid, Node) -> - #{known_nodes := ListNodes1} = await_sys_get_state(Pid), - Pid ! {nodedown, Node}, - timer:sleep(?RECONNECT_TIMEOUT + 10), - #{known_nodes := ListNodes2} = await_sys_get_state(Pid), - ?assertEqual(ListNodes1, ListNodes2). - --spec await_sys_get_state(pid()) -> any(). -await_sys_get_state(Pid) -> - case sys:get_state(Pid, 100) of - {error, _} -> await_sys_get_state(Pid); - State -> State - end. diff --git a/apps/mg_cth/include/mg_cth.hrl b/apps/mg_cth/include/mg_cth.hrl index 9f59eac..b1d998b 100644 --- a/apps/mg_cth/include/mg_cth.hrl +++ b/apps/mg_cth/include/mg_cth.hrl @@ -2,6 +2,7 @@ -define(__mg_cth__, 42). -define(NS, <<"NS">>). +-define(EVENT_SINK_NS, <<"_event_sinks">>). -define(ID, <<"ID">>). -define(EMPTY_ID, <<"">>). -define(ES_ID, <<"test_event_sink_2">>). diff --git a/apps/mg_cth/src/mg_cth_cluster.erl b/apps/mg_cth/src/mg_cth_cluster.erl new file mode 100644 index 0000000..20d1f93 --- /dev/null +++ b/apps/mg_cth/src/mg_cth_cluster.erl @@ -0,0 +1,120 @@ +-module(mg_cth_cluster). + +%% API +-export([ + prepare_cluster/1, + instance_up/1, + instance_down/1, + await_peer/2, + await_peer/1 +]). + +-define(ERLANG_TEST_HOSTS, [ + "mg-0", + "mg-1", + "mg-2", + "mg-3", + "mg-4" +]). + +-define(HOSTS_TEMPLATE, << + "127.0.0.1 localhost\n", + "::1 localhost ip6-localhost ip6-loopback\n", + "fe00::0 ip6-localnet\n", + "ff00::0 ip6-mcastprefix\n", + "ff02::1 ip6-allnodes\n", + "ff02::2 ip6-allrouters\n" +>>). + +-define(DEFAULT_ATTEMPTS, 5). + +-spec prepare_cluster(_) -> _. +prepare_cluster(HostsToUp) -> + HostsTable = hosts_table(), + %% prepare headless emulation records + HeadlessRecords = lists:foldl( + fun(Host, Acc) -> + Address = maps:get(Host, HostsTable), + <> + end, + <<"\n">>, + HostsToUp + ), + + %% prepare hosts files for each node + lists:foreach( + fun(Host) -> + Address = maps:get(Host, HostsTable), + Payload = << + ?HOSTS_TEMPLATE/binary, + Address/binary, + " ", + Host/binary, + "\n", + HeadlessRecords/binary + >>, + Filename = unicode:characters_to_list(Host) ++ "_hosts", + ok = file:write_file(Filename, Payload) + end, + HostsToUp + ), + + %% distribute hosts files + lists:foreach( + fun + (<<"mg-0">>) -> + LocalFile = "mg-0_hosts", + RemoteFile = "/etc/hosts", + Cp = os:find_executable("cp"), + CMD = Cp ++ " -f " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD); + (Host) -> + HostString = unicode:characters_to_list(Host), + LocalFile = HostString ++ "_hosts", + RemoteFile = HostString ++ ":/etc/hosts", + SshPass = os:find_executable("sshpass"), + Scp = os:find_executable("scp"), + CMD = SshPass ++ " -p security " ++ Scp ++ " " ++ LocalFile ++ " " ++ RemoteFile, + os:cmd(CMD) + end, + HostsToUp + ). + +-spec instance_up(_) -> _. +instance_up(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/entrypoint.sh", + spawn(fun() -> os:cmd(CMD) end). + +-spec instance_down(_) -> _. +instance_down(Host) -> + Ssh = os:find_executable("ssh"), + CMD = Ssh ++ " " ++ unicode:characters_to_list(Host) ++ " /opt/mg/bin/mg stop", + spawn(fun() -> os:cmd(CMD) end). + +-spec await_peer(_) -> _. +await_peer(RemoteNode) -> + await_peer(RemoteNode, ?DEFAULT_ATTEMPTS). + +-spec await_peer(_, _) -> _. +await_peer(_RemoteNode, 0) -> + error(peer_not_started); +await_peer(RemoteNode, Attempt) -> + case net_adm:ping(RemoteNode) of + pong -> + ok; + pang -> + timer:sleep(1000), + await_peer(RemoteNode, Attempt - 1) + end. + +-spec hosts_table() -> _. +hosts_table() -> + lists:foldl( + fun(Host, Acc) -> + {ok, Addr} = inet:getaddr(Host, inet), + Acc#{unicode:characters_to_binary(Host) => unicode:characters_to_binary(inet:ntoa(Addr))} + end, + #{}, + ?ERLANG_TEST_HOSTS + ). diff --git a/apps/mg_cth/src/mg_cth_configurator.erl b/apps/mg_cth/src/mg_cth_configurator.erl index 8e8a4ba..8059714 100644 --- a/apps/mg_cth/src/mg_cth_configurator.erl +++ b/apps/mg_cth/src/mg_cth_configurator.erl @@ -18,13 +18,17 @@ schedulers := mg_core_machine:schedulers_opt(), default_processing_timeout := timeout(), suicide_probability => mg_core_machine:suicide_probability(), - event_stash_size := non_neg_integer() + event_stash_size := non_neg_integer(), + scaling => mg_core_cluster:scaling_type(), + _ => _ }. -type config() :: #{ woody_server := mg_woody:woody_server(), namespaces := #{mg_core:ns() => events_machines()}, - quotas => [mg_core_quota_worker:options()] + quotas => [mg_core_quota_worker:options()], + cluster => mg_core_cluster:cluster_options(), + _ => _ }. -type processor() :: mg_woody_processor:options(). @@ -34,6 +38,8 @@ construct_child_specs(undefined) -> []; construct_child_specs(#{woody_server := WoodyServer, namespaces := Namespaces} = Config) -> Quotas = maps:get(quotas, Config, []), + ClusterOpts = maps:get(cluster, Config, #{}), + Scaling = maps:get(scaling, ClusterOpts, global_based), QuotasChSpec = quotas_child_specs(Quotas, quota), EventMachinesChSpec = events_machines_child_specs(Namespaces), @@ -41,14 +47,16 @@ construct_child_specs(#{woody_server := WoodyServer, namespaces := Namespaces} = woody_server, #{ woody_server => WoodyServer, - automaton => api_automaton_options(Namespaces), + automaton => api_automaton_options(Namespaces, #{scaling => Scaling}), pulse => mg_cth_pulse } ), + ClusterSpec = mg_core_cluster:child_spec(ClusterOpts), lists:flatten([ WoodyServerChSpec, QuotasChSpec, + ClusterSpec, EventMachinesChSpec ]). @@ -91,7 +99,8 @@ machine_options(NS, Config) -> Options = maps:with( [ retries, - timer_processing_timeout + timer_processing_timeout, + scaling ], Config ), @@ -112,8 +121,8 @@ machine_options(NS, Config) -> suicide_probability => maps:get(suicide_probability, Config, undefined) }. --spec api_automaton_options(_) -> mg_woody_automaton:options(). -api_automaton_options(NSs) -> +-spec api_automaton_options(_, _Opts) -> mg_woody_automaton:options(). +api_automaton_options(NSs, Opts) -> maps:fold( fun(NS, ConfigNS, Options) -> Options#{ @@ -125,7 +134,7 @@ api_automaton_options(NSs) -> ) } end, - #{}, + Opts, NSs ). diff --git a/apps/mg_cth/src/mg_cth_neighbour.erl b/apps/mg_cth/src/mg_cth_neighbour.erl new file mode 100644 index 0000000..95bdf63 --- /dev/null +++ b/apps/mg_cth/src/mg_cth_neighbour.erl @@ -0,0 +1,67 @@ +-module(mg_cth_neighbour). + +-behaviour(gen_server). + +-export([start/0]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-export([connecting/1]). +-export([echo/1]). + +-define(SERVER, ?MODULE). + +-record(state, {}). + +-type state() :: #state{}. + +-spec connecting(_) -> _. +connecting(RemoteData) -> + gen_server:call(?MODULE, {connecting, RemoteData}). + +-spec echo(_) -> _. +echo(Msg) -> + gen_server:call(?MODULE, {echo, Msg}). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== +-spec start() -> _. +start() -> + gen_server:start({local, ?SERVER}, ?MODULE, [], []). + +-spec init(_) -> {ok, state()}. +init([]) -> + {ok, #state{}}. + +-spec handle_call(term(), {pid(), _}, state()) -> {reply, any(), state()}. +handle_call({echo, Echo}, _From, State = #state{}) -> + {reply, {ok, Echo}, State}; +handle_call({connecting, _RemoteData}, _From, State = #state{}) -> + {reply, {ok, #{1 => node()}}, State}. + +-spec handle_cast(term(), state()) -> {noreply, state()}. +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +-spec handle_info(term(), state()) -> {noreply, state()}. +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +-spec terminate(_Reason, state()) -> ok. +terminate(_Reason, _State = #state{}) -> + ok. + +-spec code_change(_OldVsn, state(), _Extra) -> {ok, state()}. +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index 681b51c..3f29772 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -29,7 +29,10 @@ -import(mg_woody_packer, [pack/2, unpack/2]). %% API types --type options() :: #{mg_core:ns() => ns_options()}. +-type options() :: #{ + mg_core:ns() => ns_options(), + scaling => mg_core_cluster:scaling_type() +}. -type ns_options() :: #{ machine := mg_core_events_machine:options(), modernizer => mg_core_events_modernizer:options() @@ -51,6 +54,8 @@ %% -spec handler(options()) -> mg_woody_utils:woody_handler(). +handler(#{scaling := partition_based} = Options) -> + {"/v1/automaton", {{mg_proto_state_processing_thrift, 'Automaton'}, {mg_woody_automaton_balancer, Options}}}; handler(Options) -> {"/v1/automaton", {{mg_proto_state_processing_thrift, 'Automaton'}, {?MODULE, Options}}}. @@ -106,6 +111,7 @@ handle_function('Repair', {MachineDesc, Args}, WoodyContext, Options) -> {ok, Reply} -> {ok, pack(repair_response, Reply)}; {error, {failed, Reason}} -> + %% TODO catch this in balancer!!! woody_error:raise(business, pack(repair_error, Reason)) end; handle_function('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> diff --git a/apps/mg_woody/src/mg_woody_automaton_balancer.erl b/apps/mg_woody/src/mg_woody_automaton_balancer.erl new file mode 100644 index 0000000..1f78745 --- /dev/null +++ b/apps/mg_woody/src/mg_woody_automaton_balancer.erl @@ -0,0 +1,74 @@ +-module(mg_woody_automaton_balancer). + +%% woody handler +-behaviour(woody_server_thrift_handler). +-export([handle_function/4]). + +%% уменьшаем писанину +-import(mg_woody_packer, [unpack/2]). + +%% API types +-type options() :: mg_woody_automaton:options(). + +-define(AUTOMATON_HANDLER, mg_woody_automaton). + +%% +%% woody handler +%% +-spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> + {ok, _Result} | no_return(). + +handle_function('Start' = Call, {NS, IDIn, _Args} = Data, WoodyContext, Options) -> + ID = unpack(id, IDIn), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function(Call, {MachineDesc, _Args} = Data, WoodyContext, Options) when + Call =:= 'Repair'; + Call =:= 'Call'; + Call =:= 'Notify' +-> + {NS, ID, _Range} = unpack(machine_descriptor, MachineDesc), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function('SimpleRepair' = Call, {NS, RefIn} = Data, WoodyContext, Options) -> + ID = unpack(ref, RefIn), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function(Call, {MachineDesc} = Data, WoodyContext, Options) when + Call =:= 'GetMachine'; + Call =:= 'Modernize' +-> + {NS, ID, _Range} = unpack(machine_descriptor, MachineDesc), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ); +handle_function('Remove' = Call, {NS, IDIn} = Data, WoodyContext, Options) -> + ID = unpack(id, IDIn), + erpc:call( + target_node({NS, ID}), + ?AUTOMATON_HANDLER, + handle_function, + [Call, Data, WoodyContext, Options] + ). + +%% internal functions + +-spec target_node(term()) -> node(). +target_node(BalancingKey) -> + {ok, Node} = mg_core_cluster:get_node(BalancingKey), + Node. diff --git a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl index f08743f..74f6107 100644 --- a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl @@ -168,7 +168,7 @@ mg_woody_config(Name, C) -> existing_storage_name => ?config(storage_name, C) }}, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => ns, max_connections => 100} }, default_processing_timeout => 5000, @@ -176,6 +176,7 @@ mg_woody_config(Name, C) -> timers => #{} }, retries => #{}, + scaling => global_based, event_stash_size => 0 }, case Name of @@ -185,7 +186,7 @@ mg_woody_config(Name, C) -> #{ modernizer => #{ current_format_version => ?MODERN_FMT_VSN, - handler => #{url => <<"http://localhost:8023/modernizer">>} + handler => #{url => <<"http://mg-0:8023/modernizer">>} } } end diff --git a/apps/mg_woody/test/mg_stress_SUITE.erl b/apps/mg_woody/test/mg_stress_SUITE.erl index 29bf3aa..090851e 100644 --- a/apps/mg_woody/test/mg_stress_SUITE.erl +++ b/apps/mg_woody/test/mg_stress_SUITE.erl @@ -112,7 +112,7 @@ mg_woody_config(_C) -> ?NS => #{ storage => mg_core_storage_memory, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => ns, max_connections => 100} }, default_processing_timeout => 5000, @@ -121,6 +121,7 @@ mg_woody_config(_C) -> }, retries => #{}, event_sinks => [], + scaling => global_based, event_stash_size => 10 } } diff --git a/apps/mg_woody/test/mg_woody_tests_SUITE.erl b/apps/mg_woody/test/mg_woody_tests_SUITE.erl index f317aab..5b6c690 100644 --- a/apps/mg_woody/test/mg_woody_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_woody_tests_SUITE.erl @@ -74,6 +74,49 @@ -export([config_with_multiple_event_sinks/1]). +-define(CLUSTER_PARTITION_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"machinegun-ha-headless">>, + <<"sname">> => <<"mg">> + }, + scaling => partition_based, + partitioning => #{ + capacity => 5, + max_hash => 4095 + }, + reconnect_timeout => 5000 +}). + +-define(CLUSTER_GLOBAL_OPTS, #{ + discovering => #{ + <<"domain_name">> => <<"machinegun-ha-headless">>, + <<"sname">> => <<"mg">> + }, + scaling => global_based, + reconnect_timeout => 5000 +}). + +-define(STORAGE_RIAK(NS), + {mg_core_storage_riak, #{ + host => "riakdb", + port => 8087, + bucket => NS, + connect_timeout => 5000, + request_timeout => 5000, + index_query_timeout => 10000, + pool_options => #{ + init_count => 0, + max_count => 100, + idle_timeout => 60000, + cull_interval => 10000, + queue_max => 1000 + }, + batching => #{ + concurrency_limit => 100 + }, + sidecar => {mg_riak_prometheus, #{}} + }} +). %% %% tests descriptions %% @@ -84,17 +127,40 @@ -spec all() -> [test_name() | {group, group_name()}]. all() -> [ - {group, base}, - {group, history}, - {group, repair}, - {group, timers}, - {group, deadline}, + {group, standalone_memory}, + {group, standalone_memory_history}, + {group, standalone_riak}, + {group, distributed_riak}, config_with_multiple_event_sinks ]. -spec groups() -> [{group_name(), list(_), [test_name()]}]. groups() -> [ + {standalone_memory, [], [ + {group, base}, + {group, repair}, + {group, timers}, + {group, deadline} + ]}, + {standalone_memory_history, [], [ + {group, history} + ]}, + {standalone_riak, [], [ + {group, base}, + {group, history}, + {group, repair}, + {group, timers}, + {group, deadline} + ]}, + {distributed_riak, [], [ + {group, base}, + {group, history}, + {group, repair}, + {group, timers} + %% {group, deadline} + ]}, + % TODO проверить отмену таймера {base, [sequence], [ namespace_not_found, @@ -175,14 +241,60 @@ end_per_suite(_C) -> ok. -spec init_per_group(group_name(), config()) -> config(). -init_per_group(history, C) -> - init_per_group([{storage, mg_core_storage_memory} | C]); -init_per_group(_, C) -> +init_per_group(standalone_memory_history, C) -> + [ + {storage_ns, mg_core_storage_memory}, + {storage_evs, mg_core_storage_memory}, + {scaling, global_based}, + {worker, #{registry => mg_core_procreg_global}}, + {registry, mg_core_procreg_global}, + {cluster, #{}} + | C + ]; +init_per_group(standalone_memory, C) -> % NOTE % Даже такой небольшой шанс может сработать в ситуациях, когда мы в процессоре выгребаем % большой кусок истории машины, из-за чего реальная вероятность зафейлить операцию равна % (1 - (1 - p) ^ n). - init_per_group([{storage, {mg_core_storage_memory, #{random_transient_fail => 0.01}}} | C]). + [ + {storage_ns, {mg_core_storage_memory, #{random_transient_fail => 0.01}}}, + {storage_evs, mg_core_storage_memory}, + {scaling, global_based}, + {worker, #{registry => mg_core_procreg_global}}, + {registry, mg_core_procreg_global}, + {cluster, #{}} + | C + ]; +init_per_group(standalone_riak, C) -> + [ + {storage_ns, ?STORAGE_RIAK(?NS)}, + {storage_evs, ?STORAGE_RIAK(<<"_event_sinks">>)}, + {scaling, global_based}, + {worker, #{registry => mg_core_procreg_global}}, + {registry, mg_core_procreg_global}, + {cluster, #{}} + | C + ]; +init_per_group(distributed_riak, C) -> + ClusterHosts = [<<"mg-0">>, <<"mg-1">>, <<"mg-2">>], + _ = mg_cth_cluster:prepare_cluster([<<"mg-0">>, <<"mg-1">>, <<"mg-2">>]), + ok = lists:foreach( + fun(Host) -> mg_cth_cluster:instance_up(Host) end, + tl(ClusterHosts) + ), + %% wait peers + timer:sleep(3000), + [ + {storage_ns, ?STORAGE_RIAK(?NS)}, + {storage_evs, ?STORAGE_RIAK(<<"_event_sinks">>)}, + {scaling, partition_based}, + {worker, #{registry => mg_core_procreg_gproc}}, + {registry, mg_core_procreg_gproc}, + {cluster, ?CLUSTER_PARTITION_OPTS} + | C + ]; +init_per_group(_, C) -> + init_per_group(C). -spec init_per_group(config()) -> config(). init_per_group(C) -> @@ -211,7 +323,7 @@ init_per_group(C) -> [ {apps, Apps}, {automaton_options, #{ - url => "http://localhost:8022", + url => "http://mg-0:8022", ns => ?NS, retry_strategy => genlib_retry:linear(3, 1) }}, @@ -310,11 +422,12 @@ mg_woody_config(C) -> update_interval => 100 } ], + cluster => ?config(cluster, C), namespaces => #{ ?NS => #{ - storage => ?config(storage, C), + storage => ?config(storage_ns, C), processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => ns, max_connections => 100} }, default_processing_timeout => 5000, @@ -326,6 +439,9 @@ mg_woody_config(C) -> storage => {exponential, infinity, 1, 10}, timers => {exponential, infinity, 1, 10} }, + scaling => ?config(scaling, C), + registry => ?config(registry, C), + worker => ?config(worker, C), % сейчас существуют проблемы, которые не дают включить на постоянной основе эту % опцию (а очень хочется, чтобы проверять работоспособность идемпотентных ретраев) % TODO в будущем нужно это сделать @@ -343,10 +459,38 @@ mg_woody_config(C) -> }. -spec end_per_group(group_name(), config()) -> ok. +end_per_group(distributed_riak, C) -> + _ = mg_cth_cluster:prepare_cluster([<<"mg-0">>]), + ok = lists:foreach( + fun(Host) -> mg_cth_cluster:instance_down(Host) end, + [<<"mg-1">>, <<"mg-2">>] + ), + maybe_drop_buckets(C); end_per_group(_, C) -> - ok = proc_lib:stop(?config(processor_pid, C)), - mg_cth:stop_applications(?config(apps, C)). + maybe_drop_buckets(C), + try + proc_lib:stop(?config(processor_pid, C)), + mg_cth:stop_applications(?config(apps, C)) + catch + _:_ -> + ok + end. +-spec maybe_drop_buckets(_) -> _. +maybe_drop_buckets(_C) -> + {ok, Pid} = riakc_pb_socket:start_link("riakdb", 8087), + {ok, ListBuckets} = riakc_pb_socket:list_buckets(Pid, [{allow_listing, true}]), + lists:foreach( + fun(Bucket) -> + case riakc_pb_socket:list_keys(Pid, Bucket, [{allow_listing, true}]) of + {ok, Keys} -> + lists:foreach(fun(Key) -> ok = riakc_pb_socket:delete(Pid, Bucket, Key) end, Keys); + _ -> + skip + end + end, + ListBuckets + ). %% %% base group tests %% @@ -582,7 +726,22 @@ timeout_call_with_deadline(C) -> DeadlineFn = fun() -> mg_core_deadline:from_timeout(?DEADLINE_TIMEOUT) end, Options0 = no_timeout_automaton_options(C), Options1 = maps:remove(retry_strategy, Options0), - {'EXIT', {{woody_error, {external, result_unknown, <<"{timeout", _/binary>>}}, _Stack}} = + % {'EXIT', {{woody_error, {external, result_unknown, <<"{timeout", _/binary>>}}, _Stack}} = + % { + % 'EXIT', + % { + % { + % woody_error, + % { + % external, + % result_unexpected, + % <<"error:{exception,{woody_error,{internal,result_unknown,<<\"{timeout", _/binary>> + % } + % }, + % _Stack + % } + % } = + {'EXIT', {{woody_error, _}, _Stack}} = (catch mg_cth_automaton_client:call(Options1, ?ID, <<"sleep">>, DeadlineFn())), #mg_stateproc_MachineAlreadyWorking{} = (catch mg_cth_automaton_client:repair(Options0, ?ID, <<"ok">>, DeadlineFn())). @@ -603,7 +762,7 @@ config_with_multiple_event_sinks(_C) -> <<"1">> => #{ storage => mg_core_storage_memory, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => pool1, max_connections => 100} }, default_processing_timeout => 30000, @@ -612,6 +771,9 @@ config_with_multiple_event_sinks(_C) -> overseer => #{} }, retries => #{}, + scaling => global_based, + registry => mg_core_procreg_global, + worker => #{registry => mg_core_procreg_global}, event_sinks => [ {mg_core_events_sink_kafka, #{ name => kafka, @@ -623,7 +785,7 @@ config_with_multiple_event_sinks(_C) -> <<"2">> => #{ storage => mg_core_storage_memory, processor => #{ - url => <<"http://localhost:8023/processor">>, + url => <<"http://mg-0:8023/processor">>, transport_opts => #{pool => pool2, max_connections => 100} }, default_processing_timeout => 5000, @@ -632,6 +794,9 @@ config_with_multiple_event_sinks(_C) -> overseer => #{} }, retries => #{}, + scaling => global_based, + registry => mg_core_procreg_global, + worker => #{registry => mg_core_procreg_global}, event_sinks => [ {mg_core_events_sink_kafka, #{ name => kafka_other, diff --git a/compose.yaml b/compose.yaml index 3e4c531..6f5d111 100644 --- a/compose.yaml +++ b/compose.yaml @@ -2,6 +2,10 @@ services: testrunner: image: $DEV_IMAGE_TAG + environment: + WORK_DIR: $PWD + OTEL_TRACES_EXPORTER: none + OTEL_TRACES_SAMPLER: always_off build: dockerfile: Dockerfile.dev context: . @@ -10,13 +14,10 @@ services: THRIFT_VERSION: $THRIFT_VERSION volumes: - .:$PWD - hostname: $SERVICE_NAME + hostname: ${SERVICE_NAME}-0 cap_add: - NET_ADMIN working_dir: $PWD - environment: - OTEL_TRACES_EXPORTER: none - OTEL_TRACES_SAMPLER: always_off depends_on: riakdb: condition: service_started @@ -30,10 +31,44 @@ services: condition: service_healthy kafka3: condition: service_healthy + + distrunner1: + condition: service_started + distrunner2: + condition: service_started + distrunner3: + condition: service_started + distrunner4: + condition: service_started ports: - "8022" command: /sbin/init + distrunner1: &mg-cluster + image: distrunner-test + # scale: 2 + hostname: ${SERVICE_NAME}-1 + build: + dockerfile: Dockerfile.test + context: . + args: + OTP_VERSION: $OTP_VERSION + THRIFT_VERSION: $THRIFT_VERSION + cap_add: + - NET_ADMIN + + distrunner2: + <<: *mg-cluster + hostname: ${SERVICE_NAME}-2 + + distrunner3: + <<: *mg-cluster + hostname: ${SERVICE_NAME}-3 + + distrunner4: + <<: *mg-cluster + hostname: ${SERVICE_NAME}-4 + riakdb: &member-node image: docker.io/basho/riak-kv:${RIAK_VERSION} environment: diff --git a/config/config.example.yaml b/config/config.example.yaml new file mode 100644 index 0000000..068675e --- /dev/null +++ b/config/config.example.yaml @@ -0,0 +1,398 @@ +# +# Copyright 2020 RBKmoney +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +### This is one and only file to configure machinegun service. +### +### Configuration is specific for a single instance of machinegun, it's up to +### you to ensure that each instance is configured properly. This usually boils +### down to sharing namespaces configuration between instances and assigning +### proper nodenames so that instances could see and communicate with each +### other over Erlang distribution. +### +### If you find some configuration knobs are missing here do not hesitate to +### send a PR. +### +### Notes: +### +### * Configuration string support environment variable interpolation. +### +### The syntax is `${VARNAME}` where `VARNAME` is the name of referenced +### environment variable. If referenced the variable MUST be defined, +### otherwise it's a configuration error. There is no special handling of +### variable values: empty variable for example will be interpolated as +### empty string. +### +### For example, given `HOST_IP=10.0.42.42` one could define: +### dist_node_name: machinegun@${HOST_IP} +### +### * Graceful shutdowns. +### +### There are multiple `shutdown_timeout` parameters defined in this example config: +### * one for the woody server +### * one for each machinegun namespace +### * and two for consuela presence and registry +### To calculate the actual maximum time this service takes to shut down gracefully +### you need to take the woody server `shutdown_timeout` parameter, add the maximum +### value between all of the `shutdown_timeout` parameters defined for each namespace, +### and then add the both of the consuela `shutdown_timeout`s: +### +### max_shutdown_time = +### woody_server.shutdown_timeout + +### max(namespaces[].shutdown_timeout) + +### consuela.presence.shutdown_timeout + +### consuela.registry.shutdown_timeout +### +### + +# Name of the service. +# +# Defaults to: 'machinegun'. +service_name: machinegun + +# Name of the node for Erlang distribution. +# +# Defaults to: '{service_name}@{hostname}'. +# Examples: +# * with short node name: +# dist_node_name: machinegun +# * with fixed ip for host part: +# dist_node_name: machinegun@10.0.0.42 +# * for `machinegun@{primary_netif_ip}`, with latter determined at start time: +# dist_node_name: +# hostpart: ip +# * for `blarg@{fqdn}`, if latter available at start time: +# dist_node_name: +# namepart: blarg +# hostpart: fqdn +dist_node_name: + hostpart: hostname + +# Mode and allocation of ports for Erlang distribution. +# +# No defaults here, default behaviour is dictated by ERTS. +# Examples: +# * disable EMPD altogether and set a fixed port both for listening and +# communicating with remote nodes: +# dist_port: +# mode: static +# port: 31337 +dist_port: + mode: epmd + # Which ports to pick from when setting up a distribution listener? + range: [31337, 31340] + +# Erlang VM options. +erlang: + # Path to a file which holds Erlang distribution cookie. + # The cookie is _sensitive_ piece of information so handle it with caution. + # + # Must be set, there's no default. + secret_cookie_file: "config/cookie" + ipv6: true + disable_dns_cache: false + +# Opentelemetry settings +# By default opentelemetry is disabled which is equivalent to +# "opentelemetry: disabled" +opentelemetry: + # TODO Describe sampling + # Name of the service to use in recording machinegun's spans + service_name: machinegun + # For now spans processed always in batches. + # We support only "otlp" traces exporter + exporter: + # Supports only "http/protobuf" or "grpc" + protocol: http/protobuf + endpoint: http://jaeger:4318 + +# API server options. +woody_server: + ip: "::" + port: 8022 + http_keep_alive_timeout: 60s + shutdown_timeout: 0s # woody server shutdown timeout (see notes above) + +# Distributed machine registry settings. +# +# Do not set if you plan to run machinegun in a non-distributed fashion, +# for example in a development or testing scenarios. +consuela: + presence: + check_interval: 5s + shutdown_timeout: 5s + tags: + - production + registry: + nodename: mhost1 + session_ttl: 30s + session_renewal_interval: 10s + shutdown_timeout: 5s + discovery: {} + +# Consul client settings. +# +# Required when distributed machine registry is enabled. +consul: + url: http://localhost:8500 + acl_token_file: config/consul.token + connect_timeout: 200ms + recv_timeout: 1s + +# New cluster assembler (instead consuela) +# will be used if consuela undefined +# if cluster undefined then standalone mode +cluster: + discovery: + type: dns + options: + # hostname that will be resolved + domain_name: machinegun-headless + # name that will be used for construct full nodename (for example name@127.0.0.1) + sname: machinegun + # optional, supported values: global_based | partition_based, default global_based + scaling: partition_based + # required when scaling = partition_based + partitioning: + capacity: 31 + max_hash: 4095 + # optional, default value 5000 ms + reconnect_timeout: 5000 + +# if undefined will de used {mg_core_procreg_consuela, #{pulse => pulse(YamlConfig)}} +# if consuela undefined will be used mg_core_procreg_gproc +process_registry: + module: mg_core_procreg_global + +limits: + process_heap: 2M # heap limit + disk: # uses only for health check + path: "/" + value: 99% + memory: # return 503 if breaks + type: cgroups # cgroups | total + value: 90% + scheduler_tasks: 5000 +logging: + root: /var/log/mg + burst_limit_enable: false + sync_mode_qlen: 100 + drop_mode_qlen: 1000 + flush_qlen: 2000 + json_log: log.json + level: info + formatter: + max_length: 1000 + max_printable_string_length: 80 + level_map: + 'emergency': 'ERROR' + 'alert': 'ERROR' + 'critical': 'ERROR' + 'error': 'ERROR' + 'warning': 'WARN' + 'notice': 'INFO' + 'info': 'INFO' + 'debug': 'DEBUG' + +namespaces: + mg_test_ns: + # only for testing, default 0 + # suicide_probability: 0.1 + event_sinks: + machine: + type: machine + machine_id: main_event_sink + kafka: + type: kafka + client: default_kafka_client + topic: mg_test_ns + default_processing_timeout: 30s + timer_processing_timeout: 60s + reschedule_timeout: 60s + hibernate_timeout: 5s + shutdown_timeout: 1s # worker shutdown timeout (see notes above) + unload_timeout: 60s + processor: + url: http://localhost:8022/processor + pool_size: 50 + http_keep_alive_timeout: 10s + timers: + scan_interval: 1m + scan_limit: 1000 + capacity: 500 + min_scan_delay: 10s + overseer: disabled + notification: + capacity: 1000 + # search for new notification tasks in storage every x + scan_interval: 1m + # if the search had a continuation, read the continuation after x amount of time + min_scan_delay: 1s + # only search for notification tasks that are older than x + scan_handicap: 10s + # only search for notification tasks that are younger than x + scan_cutoff: 4W + # reschedule notification deliveries that failed with temporary errors x amount of time into the future + reschedule_time: 5s + # maximum number of events that will be stored inside of machine state + # must be non negative integer, default is 0 + event_stash_size: 5 + modernizer: + current_format_version: 1 + handler: + url: http://localhost:8022/modernizer + pool_size: 50 + http_keep_alive_timeout: 10s +snowflake_machine_id: 1 +# memory storage backend +# storage: +# type: memory +# riak storage backend +storage: + type: riak + host: riak-mg + port: 8078 + pool: + size: 100 + queue_max: 1000 + connect_timeout: 5s + request_timeout: 10s + index_query_timeout: 10s + batch_concurrency_limit: 50 +# Docs on what these options do +# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/developing/usage/replication +# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/learn/concepts/eventual-consistency/ + r_options: + r: quorum + pr: quorum + sloppy_quorum: false + w_options: + w: 4 + pw: 4 + dw: 4 + sloppy_quorum: false + d_options: + sloppy_quorum: false + +## kafka settings example +kafka: + default_kafka_client: + endpoints: + - host: "kafka1" + port: 9092 + - host: "kafka2" + port: 9092 + - host: "kafka3" + port: 9092 + ssl: + certfile: "client.crt" + keyfile: "client.key" + cacertfile: "ca.crt" + sasl: + mechanism: scram_sha_512 # Available: scram_sha_512, scram_sha_265, plain + # *Either* specify the `file` field or `username` and `password` fields. + # `file` is the path to a text file which contains two lines, + # first line for username and second line for password. + # Presence of the `file` field will override the presence of + # `username` and `password` fields (there is no fallback). + file: secret.txt + # ** OR ** + username: root + password: qwerty + producer: + compression: no_compression # 'gzip' or 'snappy' to enable compression + # How many message sets (per-partition) can be sent to kafka broker + # asynchronously before receiving ACKs from broker. + partition_onwire_limit: 1 + # Maximum time the broker can await the receipt of the + # number of acknowledgements in RequiredAcks. The timeout is not an exact + # limit on the request time for a few reasons: (1) it does not include + # network latency, (2) the timer begins at the beginning of the processing + # of this request so if many requests are queued due to broker overload + # that wait time will not be included, (3) kafka leader will not terminate + # a local write so if the local write time exceeds this timeout it will + # not be respected. + ack_timeout: 10s + # How many acknowledgements the kafka broker should receive from the + # clustered replicas before acking producer. + # none: the broker will not send any response + # (this is the only case where the broker will not reply to a request) + # leader_only: The leader will wait the data is written to the local log before + # sending a response. + # all_isr: If it is 'all_isr' the broker will block until the message is committed by + # all in sync replicas before acking. + required_acks: all_isr + # How many requests (per-partition) can be buffered without blocking the + # caller. The callers are released (by receiving the + # 'brod_produce_req_buffered' reply) once the request is taken into buffer + # and after the request has been put on wire, then the caller may expect + # a reply 'brod_produce_req_acked' when the request is accepted by kafka. + partition_buffer_limit: 256 + # Messages are allowed to 'linger' in buffer for this amount of + # time before being sent. + # Definition of 'linger': A message is in 'linger' state when it is allowed + # to be sent on-wire, but chosen not to (for better batching). + max_linger: 0ms + # At most this amount (count not size) of messages are allowed to 'linger' + # in buffer. Messages will be sent regardless of 'linger' age when this + # threshold is hit. + # NOTE: It does not make sense to have this value set larger than + # `partition_buffer_limit' + max_linger_count: 0 + # In case callers are producing faster than brokers can handle (or + # congestion on wire), try to accumulate small requests into batches + # as much as possible but not exceeding max_batch_size. + # OBS: If compression is enabled, care should be taken when picking + # the max batch size, because a compressed batch will be produced + # as one message and this message might be larger than + # 'max.message.bytes' in kafka config (or topic config) + max_batch_size: 1M + # If {max_retries, N} is given, the producer retry produce request for + # N times before crashing in case of failures like connection being + # shutdown by remote or exceptions received in produce response from kafka. + # The special value N = -1 means 'retry indefinitely' + max_retries: 3 + # Time in milli-seconds to sleep before retry the failed produce request. + retry_backoff: 500ms + +## +## a short example for HG +## +# service_name: machinegun +# namespaces: +# invoice: +# event_sink: payproc +# processor: +# url: http://hellgate:8022/v1/stateproc/invoice +# party: +# event_sink: payproc +# processor: +# url: http://hellgate:8022/v1/stateproc/party +# domain-config: +# processor: +# url: http://dominant:8022/v1/stateproc/party +# storage: +# type: memory + +## +## a minimal config +## +# service_name: machinegun +# namespaces: +# mg_test_ns: +# processor: +# url: http://localhost:8022/processor +# storage: +# type: memory diff --git a/config/config.yaml b/config/config.yaml index ccba38d..920bd42 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,110 +1,14 @@ -# -# Copyright 2020 RBKmoney -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# +service_name: mg -### This is one and only file to configure machinegun service. -### -### Configuration is specific for a single instance of machinegun, it's up to -### you to ensure that each instance is configured properly. This usually boils -### down to sharing namespaces configuration between instances and assigning -### proper nodenames so that instances could see and communicate with each -### other over Erlang distribution. -### -### If you find some configuration knobs are missing here do not hesitate to -### send a PR. -### -### Notes: -### -### * Configuration string support environment variable interpolation. -### -### The syntax is `${VARNAME}` where `VARNAME` is the name of referenced -### environment variable. If referenced the variable MUST be defined, -### otherwise it's a configuration error. There is no special handling of -### variable values: empty variable for example will be interpolated as -### empty string. -### -### For example, given `HOST_IP=10.0.42.42` one could define: -### dist_node_name: machinegun@${HOST_IP} -### -### * Graceful shutdowns. -### -### There are multiple `shutdown_timeout` parameters defined in this example config: -### * one for the woody server -### * one for each machinegun namespace -### To calculate the actual maximum time this service takes to shut down gracefully -### you need to take the woody server `shutdown_timeout` parameter, add the maximum -### value between all of the `shutdown_timeout` parameters defined for each namespace: -### -### max_shutdown_time = -### woody_server.shutdown_timeout + -### max(namespaces[].shutdown_timeout) -### -### +#dist_node_name: mg@${POD_IP} +#dist_port: +# mode: static +# port: 31337 -# Name of the service. -# -# Defaults to: 'machinegun'. -service_name: machinegun - -# Name of the node for Erlang distribution. -# -# Defaults to: '{service_name}@{hostname}'. -# Examples: -# * with short node name: -# dist_node_name: machinegun -# * with fixed ip for host part: -# dist_node_name: machinegun@10.0.0.42 -# * for `machinegun@{primary_netif_ip}`, with latter determined at start time: -# dist_node_name: -# hostpart: ip -# * for `blarg@{fqdn}`, if latter available at start time: -# dist_node_name: -# namepart: blarg -# hostpart: fqdn -dist_node_name: - hostpart: hostname - -# Mode and allocation of ports for Erlang distribution. -# -# No defaults here, default behaviour is dictated by ERTS. -# Examples: -# * disable EMPD altogether and set a fixed port both for listening and -# communicating with remote nodes: -# dist_port: -# mode: static -# port: 31337 -dist_port: - mode: epmd - # Which ports to pick from when setting up a distribution listener? - range: [31337, 31340] - -# Erlang VM options. erlang: - # Path to a file which holds Erlang distribution cookie. - # The cookie is _sensitive_ piece of information so handle it with caution. - # - # Must be set, there's no default. - secret_cookie_file: "config/cookie" - ipv6: true - disable_dns_cache: false + ipv6: false + secret_cookie_file: /opt/mg/etc/cookie -# TODO Retire this option, rely upon OTEL env variables -# https://empayre.youtrack.cloud/issue/TD-838 -# Opentelemetry settings -# By default opentelemetry is disabled which is equivalent to -# "opentelemetry: disabled" opentelemetry: # TODO Describe sampling # Name of the service to use in recording machinegun's spans @@ -116,50 +20,10 @@ opentelemetry: protocol: http/protobuf endpoint: http://jaeger:4318 -# API server options. -woody_server: - ip: "::" - port: 8022 - http_keep_alive_timeout: 60s - shutdown_timeout: 0s # woody server shutdown timeout (see notes above) - -# Cluster assembler -# if cluster undefined then standalone mode -cluster: - discovery: - type: dns - options: - # hostname that will be resolved - domain_name: machinegun-headless - # name that will be used for construct full nodename (for example name@127.0.0.1) - sname: machinegun - # optional, default value 5000 ms - reconnect_timeout: 5000 - -# if undefined then 'mg_core_procreg_gproc' will be used -process_registry: - module: mg_core_procreg_global - -limits: - process_heap: 2M # heap limit - disk: # uses only for health check - path: "/" - value: 99% - memory: # return 503 if breaks - type: cgroups # cgroups | total - value: 90% - scheduler_tasks: 5000 logging: - root: /var/log/mg - burst_limit_enable: false - sync_mode_qlen: 100 - drop_mode_qlen: 1000 - flush_qlen: 2000 - json_log: log.json - level: info + out_type: stdout + level: warning formatter: - max_length: 1000 - max_printable_string_length: 80 level_map: 'emergency': 'ERROR' 'alert': 'ERROR' @@ -170,84 +34,91 @@ logging: 'info': 'INFO' 'debug': 'DEBUG' +woody_server: + ip: "::" + port: 8022 + max_concurrent_connections: 8000 + http_keep_alive_timeout: 3000ms + shutdown_timeout: 5s + +storage: + type: riak + host: riakdb + port: 8087 + pool: + size: 100 + queue_max: 500 + batch_concurrency_limit: 10 + connect_timeout: 500ms + request_timeout: 10s + index_query_timeout: 60s + +worker: + message_queue_len_limit: 1000 + +process_registry: + module: mg_core_procreg_gproc + +cluster: + discovery: + type: dns + options: + domain_name: machinegun-ha-headless + sname: mg + scaling: partition_based + partitioning: + capacity: 5 + max_hash: 4095 + reconnect_timeout: 5000 + namespaces: - mg_test_ns: - # only for testing, default 0 - # suicide_probability: 0.1 + NS: + timers: + scan_interval: 60s + scan_limit: 2000 + capacity: 3500 + min_scan_delay: 5s + overseer: + scan_interval: 60m + min_scan_delay: 5s + retries: + storage: + type: exponential + max_retries: infinity + factor: 2 + timeout: 10ms + max_timeout: 60s + timers: + type: exponential + max_retries: 100 + factor: 2 + timeout: 2s + max_timeout: 30m + processor: + type: exponential + max_retries: + max_total_timeout: 1d + factor: 2 + timeout: 10ms + max_timeout: 60s + continuation: + type: exponential + max_retries: infinity + factor: 2 + timeout: 10ms + max_timeout: 60s event_sinks: kafka: type: kafka + topic: test_transaction client: default_kafka_client - topic: mg_test_ns - default_processing_timeout: 30s - timer_processing_timeout: 60s - reschedule_timeout: 60s - hibernate_timeout: 5s - shutdown_timeout: 1s # worker shutdown timeout (see notes above) - unload_timeout: 60s processor: - url: http://localhost:8022/processor - pool_size: 50 - http_keep_alive_timeout: 10s - timers: - scan_interval: 1m - scan_limit: 1000 - capacity: 500 - min_scan_delay: 10s - overseer: disabled - notification: - capacity: 1000 - # search for new notification tasks in storage every x - scan_interval: 1m - # if the search had a continuation, read the continuation after x amount of time - min_scan_delay: 1s - # only search for notification tasks that are older than x - scan_handicap: 10s - # only search for notification tasks that are younger than x - scan_cutoff: 4W - # reschedule notification deliveries that failed with temporary errors x amount of time into the future - reschedule_time: 5s - # maximum number of events that will be stored inside of machine state - # must be non negative integer, default is 0 - event_stash_size: 5 - modernizer: - current_format_version: 1 - handler: - url: http://localhost:8022/modernizer - pool_size: 50 - http_keep_alive_timeout: 10s -snowflake_machine_id: 1 -# memory storage backend -# storage: -# type: memory -# riak storage backend -storage: - type: riak - host: riak-mg - port: 8078 - pool: - size: 100 - queue_max: 1000 - connect_timeout: 5s - request_timeout: 10s - index_query_timeout: 10s - batch_concurrency_limit: 50 -# Docs on what these options do -# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/developing/usage/replication -# https://www.tiot.jp/riak-docs/riak/kv/3.2.0/learn/concepts/eventual-consistency/ - r_options: - r: quorum - pr: quorum - sloppy_quorum: false - w_options: - w: 4 - pw: 4 - dw: 4 - sloppy_quorum: false - d_options: - sloppy_quorum: false + url: http://mg-0:8023/processor + pool_size: 2000 + http_keep_alive_timeout: 3000ms + unload_timeout: 180s + shutdown_timeout: 5s -## kafka settings example kafka: default_kafka_client: endpoints: @@ -257,73 +128,14 @@ kafka: port: 9092 - host: "kafka3" port: 9092 - ssl: - certfile: "client.crt" - keyfile: "client.key" - cacertfile: "ca.crt" - sasl: - mechanism: scram_sha_512 # Available: scram_sha_512, scram_sha_265, plain - # *Either* specify the `file` field or `username` and `password` fields. - # `file` is the path to a text file which contains two lines, - # first line for username and second line for password. - # Presence of the `file` field will override the presence of - # `username` and `password` fields (there is no fallback). - file: secret.txt - # ** OR ** - username: root - password: qwerty producer: - compression: no_compression # 'gzip' or 'snappy' to enable compression - # How many message sets (per-partition) can be sent to kafka broker - # asynchronously before receiving ACKs from broker. + compression: no_compression partition_onwire_limit: 1 - # Maximum time the broker can await the receipt of the - # number of acknowledgements in RequiredAcks. The timeout is not an exact - # limit on the request time for a few reasons: (1) it does not include - # network latency, (2) the timer begins at the beginning of the processing - # of this request so if many requests are queued due to broker overload - # that wait time will not be included, (3) kafka leader will not terminate - # a local write so if the local write time exceeds this timeout it will - # not be respected. ack_timeout: 10s - # How many acknowledgements the kafka broker should receive from the - # clustered replicas before acking producer. - # none: the broker will not send any response - # (this is the only case where the broker will not reply to a request) - # leader_only: The leader will wait the data is written to the local log before - # sending a response. - # all_isr: If it is 'all_isr' the broker will block until the message is committed by - # all in sync replicas before acking. required_acks: all_isr - # How many requests (per-partition) can be buffered without blocking the - # caller. The callers are released (by receiving the - # 'brod_produce_req_buffered' reply) once the request is taken into buffer - # and after the request has been put on wire, then the caller may expect - # a reply 'brod_produce_req_acked' when the request is accepted by kafka. partition_buffer_limit: 256 - # Messages are allowed to 'linger' in buffer for this amount of - # time before being sent. - # Definition of 'linger': A message is in 'linger' state when it is allowed - # to be sent on-wire, but chosen not to (for better batching). max_linger: 0ms - # At most this amount (count not size) of messages are allowed to 'linger' - # in buffer. Messages will be sent regardless of 'linger' age when this - # threshold is hit. - # NOTE: It does not make sense to have this value set larger than - # `partition_buffer_limit' max_linger_count: 0 - # In case callers are producing faster than brokers can handle (or - # congestion on wire), try to accumulate small requests into batches - # as much as possible but not exceeding max_batch_size. - # OBS: If compression is enabled, care should be taken when picking - # the max batch size, because a compressed batch will be produced - # as one message and this message might be larger than - # 'max.message.bytes' in kafka config (or topic config) max_batch_size: 1M - # If {max_retries, N} is given, the producer retry produce request for - # N times before crashing in case of failures like connection being - # shutdown by remote or exceptions received in produce response from kafka. - # The special value N = -1 means 'retry indefinitely' max_retries: 3 - # Time in milli-seconds to sleep before retry the failed produce request. retry_backoff: 500ms diff --git a/elvis.config b/elvis.config index ae0fa37..f4b5865 100644 --- a/elvis.config +++ b/elvis.config @@ -42,7 +42,7 @@ mg_core_gen_squad, mg_core_gen_squad_heart, mg_core_storage_memory, - mg_core_union, + mg_core_cluster, mg_core_worker ] }}, @@ -111,7 +111,12 @@ % We want to use `ct:pal/2` and friends in test code. {elvis_style, no_debug_call, disable}, % Tests are usually more comprehensible when a bit more verbose. - {elvis_style, dont_repeat_yourself, #{min_complexity => 20}}, + {elvis_style, dont_repeat_yourself, #{ + min_complexity => 20, + ignore => [ + mg_woody_tests_SUITE + ] + }}, {elvis_style, god_modules, #{ ignore => [ mg_prometheus_metric_SUITE, diff --git a/rebar.config b/rebar.config index 398da9c..7186801 100644 --- a/rebar.config +++ b/rebar.config @@ -85,12 +85,13 @@ {overlay, [ {template, "rel_scripts/entrypoint.sh", "bin/entrypoint.sh"}, {copy, "rel_scripts/configurator.escript", "bin/configurator.escript"}, - {copy, "config/config.yaml", "etc/config.yaml"} + {copy, "config/config.yaml", "etc/config.yaml"}, + {copy, "config/cookie", "etc/cookie"} ]} ]} ]}, {test, [ - {deps, [{proper, "1.4.0"}]}, + {deps, [{proper, "1.4.0"}, {meck, "0.9.2"}]}, {cover_enabled, true}, {cover_excl_apps, [mg_cth]}, {dialyzer, [{plt_extra_apps, [eunit, common_test, proper]}]} diff --git a/rebar.config.script b/rebar.config.script new file mode 100644 index 0000000..f294de0 --- /dev/null +++ b/rebar.config.script @@ -0,0 +1,23 @@ +case os:getenv("IP") of + false -> CONFIG; + IpString -> + Node = erlang:list_to_atom("mg@" ++ IpString), + {value, {profiles, Profiles0}, Config0} = lists:keytake(profiles, 1, CONFIG), + {value, {test, TestProfile0}, Profiles1} = lists:keytake(test, 1, Profiles0), + TestProfile = lists:keystore( + dist_node, + 1, + TestProfile0, + {dist_node, [ + {setcookie, 'HI-MARK!'}, + {name, Node} + ]} + ), + Profiles = lists:keystore( + test, + 1, + Profiles1, + {test, TestProfile} + ), + lists:keystore(profiles, 1, Config0, {profiles, Profiles}) +end. diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index 4426047..e82be0d 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -350,17 +350,28 @@ cluster(YamlConfig) -> <<"dns">> -> DiscoveryOptsList = ?C:conf([cluster, discovery, options], YamlConfig), ReconnectTimeout = ?C:conf([cluster, reconnect_timeout], YamlConfig, 5000), - #{ - discovery => #{ - module => mg_core_union, - options => maps:from_list(DiscoveryOptsList) - }, + PartitionsOpts = partitions_options(YamlConfig), + genlib_map:compact(#{ + discovering => maps:from_list(DiscoveryOptsList), + scaling => scaling(YamlConfig), + partitioning => PartitionsOpts, reconnect_timeout => ReconnectTimeout - }; + }); _ -> #{} end. +partitions_options(YamlConfig) -> + case ?C:conf([cluster, partitioning], YamlConfig, undefined) of + undefined -> + undefined; + ListOpts -> + lists:foldl(fun({Key, Value}, Acc) -> Acc#{erlang:binary_to_atom(Key) => Value} end, #{}, ListOpts) + end. + +scaling(YamlConfig) -> + ?C:atom(?C:conf([cluster, scaling], YamlConfig, <<"global_based">>)). + quotas(YamlConfig) -> SchedulerLimit = ?C:conf([limits, scheduler_tasks], YamlConfig, 5000), [ @@ -518,7 +529,8 @@ namespace({Name, NSYamlConfig}, YamlConfig) -> schedulers => namespace_schedulers(NSYamlConfig), event_sinks => [event_sink(ES) || ES <- ?C:conf([event_sinks], NSYamlConfig, [])], suicide_probability => ?C:probability(?C:conf([suicide_probability], NSYamlConfig, 0)), - event_stash_size => ?C:conf([event_stash_size], NSYamlConfig, 0) + event_stash_size => ?C:conf([event_stash_size], NSYamlConfig, 0), + scaling => scaling(YamlConfig) }, conf_with([modernizer], NSYamlConfig, #{}, fun(ModernizerYamlConfig) -> #{ @@ -618,6 +630,7 @@ event_sink(kafka, Name, ESYamlConfig) -> topic => ?C:conf([topic], ESYamlConfig) }}. +%% TODO procreg(YamlConfig) -> % Use process_registry if it's set up or gproc otherwise conf_with( diff --git a/test_resources/authorized_keys b/test_resources/authorized_keys new file mode 100644 index 0000000..36c6d89 --- /dev/null +++ b/test_resources/authorized_keys @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDRD4mOIKKi7qJ39h1Ll4ej8sQTVJW0r1XXiK/eigSjCln1+K2zEvDKr4UCKaLeQaObcbcPdYQFPSIsyOYbKoFi1I/VkiUnkwIn516/VRx+zeCM3D2BhC+c3ikGbQnYK/dToyOT4w641j0BtKmMf80Y2rIyHFcFlEPEoICM9SznvkqcFSSs/qs6fALffvtOnaZsVUVhF+elOOupOuYGVvaMANPOWeyj5GqZ+mX/6vEPLglGD6vrt17GjJL/iz2XHhI/gej+AcRQ0fdfQKXWeUPAeijqFSY65D5GqLIl5AhRqizXCh0lNkodSJ0A4u2J31mniC1jZoVBkmV3O3Kq1ODdExcDDs8JUIaVAxXdHD5MnQnY3Bnm/yfB6sb8oUg/FC74IRqrIuXmMUlNBvu1N6zA775cXR1wSPkXTSP4KeKyYcJIpyfAeZoNBed6oqn4nRag6FiGXsgm6Z1Cdz1yn3kCCIWFgYmjMhdw6O8ZXrDPNh2uS87ZpgGXHpfzcZtrqU0= root@mg-0 diff --git a/test_resources/entrypoint.sh.dev b/test_resources/entrypoint.sh.dev new file mode 100644 index 0000000..29df861 --- /dev/null +++ b/test_resources/entrypoint.sh.dev @@ -0,0 +1,5 @@ +#!/bin/sh +ADDR=`ip route get 8.8.8.8 | grep -oP "src \K[^ ]+"` +export IP=${ADDR} +ip route add throw 10.254.254.0/24 +exec "$@" diff --git a/test_resources/entrypoint.sh.test b/test_resources/entrypoint.sh.test new file mode 100644 index 0000000..aa0fa92 --- /dev/null +++ b/test_resources/entrypoint.sh.test @@ -0,0 +1,4 @@ +#!/bin/sh +ADDR=`ip route get 8.8.8.8 | grep -oP "src \K[^ ]+"` +echo "dist_node_name: mg@${ADDR}" >> /opt/mg/etc/config.yaml +/usr/sbin/sshd -D diff --git a/test_resources/ssh/config b/test_resources/ssh/config new file mode 100644 index 0000000..d6e34f5 --- /dev/null +++ b/test_resources/ssh/config @@ -0,0 +1 @@ +StrictHostKeyChecking off diff --git a/test_resources/ssh/id_rsa b/test_resources/ssh/id_rsa new file mode 100644 index 0000000..2a05530 --- /dev/null +++ b/test_resources/ssh/id_rsa @@ -0,0 +1,38 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn +NhAAAAAwEAAQAAAYEA0Q+JjiCiou6id/YdS5eHo/LEE1SVtK9V14iv3ooEowpZ9fitsxLw +yq+FAimi3kGjm3G3D3WEBT0iLMjmGyqBYtSP1ZIlJ5MCJ+dev1Ucfs3gjNw9gYQvnN4pBm +0J2Cv3U6Mjk+MOuNY9AbSpjH/NGNqyMhxXBZRDxKCAjPUs575KnBUkrP6rOnwC3377Tp2m +bFVFYRfnpTjrqTrmBlb2jADTzlnso+Rqmfpl/+rxDy4JRg+r67dexoyS/4s9lx4SP4Ho/g +HEUNH3X0Cl1nlDwHoo6hUmOuQ+RqiyJeQIUaos1wodJTZKHUidAOLtid9Zp4gtY2aFQZJl +dztyqtTg3RMXAw7PCVCGlQMV3Rw+TJ0J2NwZ5v8nwerG/KFIPxQu+CEaqyLl5jFJTQb7tT +eswO++XF0dcEj5F00j+CnismHCSKcnwHmaDQXneqKp+J0WoOhYhl7IJumdQnc9cp95AgiF +hYGJozIXcOjvGV6wzzYdrkvO2aYBlx6X83Gba6lNAAAFgNTjC3PU4wtzAAAAB3NzaC1yc2 +EAAAGBANEPiY4goqLuonf2HUuXh6PyxBNUlbSvVdeIr96KBKMKWfX4rbMS8MqvhQIpot5B +o5txtw91hAU9IizI5hsqgWLUj9WSJSeTAifnXr9VHH7N4IzcPYGEL5zeKQZtCdgr91OjI5 +PjDrjWPQG0qYx/zRjasjIcVwWUQ8SggIz1LOe+SpwVJKz+qzp8At9++06dpmxVRWEX56U4 +66k65gZW9owA085Z7KPkapn6Zf/q8Q8uCUYPq+u3XsaMkv+LPZceEj+B6P4BxFDR919Apd +Z5Q8B6KOoVJjrkPkaosiXkCFGqLNcKHSU2Sh1InQDi7YnfWaeILWNmhUGSZXc7cqrU4N0T +FwMOzwlQhpUDFd0cPkydCdjcGeb/J8HqxvyhSD8ULvghGqsi5eYxSU0G+7U3rMDvvlxdHX +BI+RdNI/gp4rJhwkinJ8B5mg0F53qiqfidFqDoWIZeyCbpnUJ3PXKfeQIIhYWBiaMyF3Do +7xlesM82Ha5LztmmAZcel/Nxm2upTQAAAAMBAAEAAAGAYOEUW3qgI2T2gSTaGoeT4dPELT +kLTvnZi9HZvgSzdWJ8odGlnNBwKV0BBCmLQfek+4nMzSsmDM9xoNNQXtJptwTNyqi48wfa +/eboLz4fwFtjbaM6FWTOM6F33XR2FWj6ahW1jPixf9I33yx7TZKD1rqxzSr44Kr+ZIYETE +3pi1LRfFcH8erqKmYBZtSPXLUNxDIXvpC3Vgd0na2fntx50BMqE/vz/1cAV26ECf4zy1cI +ESF+B/Onxdaq4CUEW50g/RqNEnUWjr7w5CJObNj4dEWBUxL8IRMxNJ5LmXwWj9rdi+Umj4 +Ei7k+kKp81hIZwIGyQca5wzQytkamTtoRoUFij2UfoHJN3nOeRgLwWKpEKLofwJIP1nLU/ +MK6gSl8SJRR0lxztoRMTPD0QGGtgOj8f0QK+2xkvAb6TtTkCZRqC0V1HouWINzZ7vAPkWg +CwH0ajsUHvSfDyzvF/rP5hxuM67J/KCzar6EwugpJeiZaEOc5C2rTb9yOptDrWQ715AAAA +wQDv7bwQ528rohf/Skpwa9zT9LlyHdLb7iW+wV8CMqe3f5mPrAth4yE5Wp8Kdp4gg/2yRI +pQZr276noLcWHViUHlUrKjgI+sCqbK8gVrLYiid2z4gCt8TmPojhALXXPmaiIuFma0U3jR +qFtoz93cS8pU3rI8KWdBPP4l/EeM0B+l/GyVQm+3smAZSPYwCezLX6kXLTxVMbOWtKSPj9 +RuWRdq8O7/bH9Trr2MHC+q5BrEnU51ddOm8gbxdhauXZX7a14AAADBAPDhtkXric90HUad +EQISy50W8im5zw5+9Mry2w4W4WWPoqcQbC396pQCK7Rus+7wi11JeODkmU6iYyCMyw5e3F +yxLSQ/MCEIMYOLSv8mq3LXhsnED9SKNMWIp5ypj1AqMF9a20P+5FUC5zNj/JvHSCUEzK7G +bTuUSHT8KM3sLxd1LU2hzFgNKLJhopD1+BatNy28V0IwZKX2NhAtcHLQgDMQ1Bqb4fO76r +By5UNP6sD4GxqnqsJnnZsIJbbr+dZUVwAAAMEA3i6NRpepUbpgMs9+YYenNtfHU7u6idDP +Suzh1XcA47IsIEYkTsUsectEwXEsIybfXFQbwCtZFHUE9zfoMjupl8HsJZ6rBlQQe9YuXW +GQKiTTMTK4lwh6m9wYrru2xFWQ2Ff6N99vc86sa1Tnas3Eo77kQePnCCXYT8vaLJQOJwvf +WHOFP/PpBysL/lRnDCjNw++KlOds9mdVmMlu32wfYy5eFgYvz7CDcrmEZJL4drCq/ire2A +JM42SLsas41cj7AAAACXJvb3RAbWctMAE= +-----END OPENSSH PRIVATE KEY----- diff --git a/test_resources/ssh/id_rsa.pub b/test_resources/ssh/id_rsa.pub new file mode 100644 index 0000000..36c6d89 --- /dev/null +++ b/test_resources/ssh/id_rsa.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDRD4mOIKKi7qJ39h1Ll4ej8sQTVJW0r1XXiK/eigSjCln1+K2zEvDKr4UCKaLeQaObcbcPdYQFPSIsyOYbKoFi1I/VkiUnkwIn516/VRx+zeCM3D2BhC+c3ikGbQnYK/dToyOT4w641j0BtKmMf80Y2rIyHFcFlEPEoICM9SznvkqcFSSs/qs6fALffvtOnaZsVUVhF+elOOupOuYGVvaMANPOWeyj5GqZ+mX/6vEPLglGD6vrt17GjJL/iz2XHhI/gej+AcRQ0fdfQKXWeUPAeijqFSY65D5GqLIl5AhRqizXCh0lNkodSJ0A4u2J31mniC1jZoVBkmV3O3Kq1ODdExcDDs8JUIaVAxXdHD5MnQnY3Bnm/yfB6sb8oUg/FC74IRqrIuXmMUlNBvu1N6zA775cXR1wSPkXTSP4KeKyYcJIpyfAeZoNBed6oqn4nRag6FiGXsgm6Z1Cdz1yn3kCCIWFgYmjMhdw6O8ZXrDPNh2uS87ZpgGXHpfzcZtrqU0= root@mg-0