From da3a5b8babf2842ac5f384b263baf3c8ae602783 Mon Sep 17 00:00:00 2001 From: alde103 Date: Sun, 11 Aug 2024 22:03:33 -0600 Subject: [PATCH 1/2] TCP Client: refactor & packet_format argument added --- lib/tcp/client.ex | 304 ++++++++++++++++++++++++---------------------- 1 file changed, 157 insertions(+), 147 deletions(-) diff --git a/lib/tcp/client.ex b/lib/tcp/client.ex index 977479f..9af6e3a 100644 --- a/lib/tcp/client.ex +++ b/lib/tcp/client.ex @@ -8,17 +8,18 @@ defmodule Modbux.Tcp.Client do require Logger @timeout 2000 - @port 502 + @tcp_port 502 @ip {0, 0, 0, 0} @active false - @to 2000 + @packet_format :raw defstruct ip: nil, tcp_port: nil, socket: nil, - timeout: @to, + timeout: @timeout, active: false, - transid: 0, + packet_format: @packet_format, + transaction_id: 0, status: nil, d_pid: nil, msg_len: 0, @@ -39,6 +40,7 @@ defmodule Modbux.Tcp.Client do * `ip` - is the internet address of the desired Modbux TCP Server. * `tcp_port` - is the desired Modbux TCP Server tcp port number. * `timeout` - is the connection timeout. + * `packet_fotmat` - is the :gen_tcp `packet` argument, it accepts 0 | 1 | 2 | 4 | :raw, values (default: :raw). * `active` - (`true` or `false`) specifies whether data is received as messages (mailbox) or by calling `confirmation/1` each time `request/2` is called. @@ -78,6 +80,7 @@ defmodule Modbux.Tcp.Client do * `ip` - is the internet address of the desired Modbux TCP Server. * `tcp_port` - is the Modbux TCP Server tcp port number . * `timeout` - is the connection timeout. + * `packet_fotmat` - is the :gen_tcp `packet` argument, it accepts 0 | 1 | 2 | 4 | :raw, values (default: :raw). * `active` - (`true` or `false`) specifies whether data is received as messages (mailbox) or by calling `confirmation/1` each time `request/2` is called. """ @@ -132,20 +135,30 @@ defmodule Modbux.Tcp.Client do # callbacks def init(args) do - port = args[:tcp_port] || @port - ip = args[:ip] || @ip - timeout = args[:timeout] || @timeout - status = :closed - - active = - if args[:active] == nil do - @active - else - args[:active] - end - - state = %Client{ip: ip, tcp_port: port, timeout: timeout, status: status, active: active} - {:ok, state} + with tcp_port <- Keyword.get(args, :tcp_port, @tcp_port), + true <- is_integer(tcp_port), + ip <- Keyword.get(args, :ip, @ip), + true <- is_tuple(ip), + timeout <- Keyword.get(args, :timeout, @timeout), + true <- is_integer(timeout), + packet_format <- Keyword.get(args, :packet_format, @packet_format), + true <- packet_format in [0, 1, 2, 4, :raw, :line], + active <- Keyword.get(args, :active, @active), + true <- is_boolean(active) do + state = %Client{ + ip: ip, + tcp_port: tcp_port, + timeout: timeout, + active: active, + packet_format: packet_format, + status: :closed + } + + {:ok, state} + else + _ -> + {:stop, :einval} + end end def handle_call(:state, from, state) do @@ -153,157 +166,92 @@ defmodule Modbux.Tcp.Client do {:reply, state, state} end - def handle_call({:configure, args}, _from, state) do - case state.status do - :closed -> - port = args[:tcp_port] || state.tcp_port - ip = args[:ip] || state.ip - timeout = args[:timeout] || state.timeout - d_pid = args[:d_pid] || state.d_pid - - active = - if args[:active] == nil do - state.active - else - args[:active] - end - - new_state = %Client{state | ip: ip, tcp_port: port, timeout: timeout, active: active, d_pid: d_pid} - {:reply, :ok, new_state} + def handle_call({:configure, args}, _from, %{status: :closed} = state) do + with tcp_port <- Keyword.get(args, :tcp_port, state.tcp_port), + true <- is_integer(tcp_port), + ip <- Keyword.get(args, :ip, state.ip), + true <- is_tuple(ip), + timeout <- Keyword.get(args, :timeout, state.timeout), + true <- is_integer(timeout), + d_pid <- Keyword.get(args, :d_pid, state.d_pid), + true <- is_pid(d_pid) or is_nil(d_pid), + packet_format <- Keyword.get(args, :packet_format, state.packet_format), + true <- packet_format in [0, 1, 2, 4, :raw, :line], + active <- Keyword.get(args, :active, state.active), + true <- is_boolean(active) do + new_state = %Client{ + state + | ip: ip, + tcp_port: tcp_port, + timeout: timeout, + active: active, + packet_format: packet_format, + d_pid: d_pid + } + {:reply, :ok, new_state} + else _ -> - {:reply, :error, state} + {:reply, {:error, :einval}, state} end end - def handle_call(:connect, {from, _ref}, state) do - Logger.debug("(#{__MODULE__}, :connect) state: #{inspect(state)}") - Logger.debug("(#{__MODULE__}, :connect) from: #{inspect(from)}") + def handle_call({:configure, _args}, _from, state) do + {:reply, :error, state} + end + + def handle_call(:connect, {from, _ref}, %{d_pid: d_pid} = state) do + Logger.debug("(#{__MODULE__}, :connect) state: #{inspect(state)} from: #{inspect(from)}") case :gen_tcp.connect( state.ip, state.tcp_port, - [:binary, packet: :raw, active: state.active], + [:binary, packet: state.packet_format, active: state.active], state.timeout ) do {:ok, socket} -> - ctrl_pid = - if state.d_pid == nil do - from - else - state.d_pid - end - - # state + ctrl_pid = d_pid || from new_state = %Client{state | socket: socket, status: :connected, d_pid: ctrl_pid} {:reply, :ok, new_state} {:error, reason} -> Logger.error("(#{__MODULE__}, :connect) reason #{inspect(reason)}") - # state {:reply, {:error, reason}, state} end end + def handle_call(:close, _from, %{socket: nil} = state) do + Logger.error("(#{__MODULE__}, :close) No port to close") + {:reply, {:error, :closed}, state} + end + def handle_call(:close, _from, state) do Logger.debug("(#{__MODULE__}, :close) state: #{inspect(state)}") - - if state.socket != nil do - new_state = close_socket(state) - {:reply, :ok, new_state} - else - Logger.error("(#{__MODULE__}, :close) No port to close") - # state - {:reply, {:error, :closed}, state} - end + {:reply, :ok, close_socket(state)} end - def handle_call({:request, cmd}, _from, state) do + def handle_call({:request, _cmd}, _from, %{status: :closed} = state) do Logger.debug("(#{__MODULE__}, :request) state: #{inspect(state)}") + {:reply, {:error, :closed}, state} + end - case state.status do - :connected -> - request = Tcp.pack_req(cmd, state.transid) - length = Tcp.res_len(cmd) - - case :gen_tcp.send(state.socket, request) do - :ok -> - new_state = - if state.active do - new_msg = Map.put(state.pending_msg, state.transid, cmd) - - n_msg = - if state.transid + 1 > 0xFFFF do - 0 - else - state.transid + 1 - end - - %Client{state | msg_len: length, cmd: cmd, pending_msg: new_msg, transid: n_msg} - else - %Client{state | msg_len: length, cmd: cmd} - end - - {:reply, :ok, new_state} - - {:error, :closed} -> - new_state = close_socket(state) - {:reply, {:error, :closed}, new_state} - - {:error, reason} -> - {:reply, {:error, reason}, state} - end - - :closed -> - {:reply, {:error, :closed}, state} - end + def handle_call({:request, cmd}, _from, %{status: :connected} = state) do + Logger.debug("(#{__MODULE__}, :request) state: #{inspect(state)}") + {genserver_response, new_state} = send_modbus_tcp_request(cmd, state) + {:reply, genserver_response, new_state} end - # only in passive mode - def handle_call(:confirmation, _from, state) do + # only in passive mode (active: false) + def handle_call(:confirmation, _from, %{status: status, active: active} = state) + when status == :closed or active == true do Logger.debug("(#{__MODULE__}, :confirmation) state: #{inspect(state)}") + {:reply, {:error, :closed}, state} + end - if state.active do - {:reply, :error, state} - else - case state.status do - :connected -> - case :gen_tcp.recv(state.socket, state.msg_len, state.timeout) do - {:ok, response} -> - values = Tcp.parse_res(state.cmd, response, state.transid) - Logger.debug("(#{__MODULE__}, :confirmation) response: #{inspect(response)}") - - n_msg = - if state.transid + 1 > 0xFFFF do - 0 - else - state.transid + 1 - end - - new_state = %Client{state | transid: n_msg, cmd: nil, msg_len: 0} - - case values do - # escribió algo - nil -> - {:reply, :ok, new_state} - - # leemos algo - _ -> - {:reply, {:ok, values}, new_state} - end - - {:error, reason} -> - Logger.error("(#{__MODULE__}, :confirmation) reason: #{inspect(reason)}") - # cerrar? - new_state = close_socket(state) - new_state = %Client{new_state | cmd: nil, msg_len: 0} - {:reply, {:error, reason}, new_state} - end - - :closed -> - {:reply, {:error, :closed}, state} - end - end + def handle_call(:confirmation, _from, %{status: :connected} = state) do + Logger.debug("(#{__MODULE__}, :confirmation) state: #{inspect(state)}") + {genserver_response, new_state} = receive_modbus_tcp_confirmation(state) + {:reply, genserver_response, new_state} end def handle_call(:flush, _from, state) do @@ -313,24 +261,23 @@ defmodule Modbux.Tcp.Client do # only for active mode (active: true) def handle_info({:tcp, _port, response}, state) do - Logger.debug("(#{__MODULE__}, :message_active) response: #{inspect(response)}") - Logger.debug("(#{__MODULE__}, :message_active) state: #{inspect(state)}") + Logger.debug("(#{__MODULE__}, :message_active) response: #{inspect(response)} state: #{inspect(state)}") h = :binary.at(response, 0) l = :binary.at(response, 1) - transid = h * 256 + l - Logger.debug("(#{__MODULE__}, :message_active) transid: #{inspect(transid)}") + transaction_id = h * 256 + l + Logger.debug("(#{__MODULE__}, :message_active) transaction_id: #{inspect(transaction_id)}") - case Map.fetch(state.pending_msg, transid) do + case Map.fetch(state.pending_msg, transaction_id) do :error -> Logger.error("(#{__MODULE__}, :message_active) unknown transaction id") {:noreply, state} {:ok, cmd} -> - values = Tcp.parse_res(cmd, response, transid) + values = Tcp.parse_res(cmd, response, transaction_id) msg = {:modbus_tcp, cmd, values} send(state.d_pid, msg) - new_pending_msg = Map.delete(state.pending_msg, transid) + new_pending_msg = Map.delete(state.pending_msg, transaction_id) new_state = %Client{state | cmd: nil, msg_len: 0, pending_msg: new_pending_msg} {:noreply, new_state} end @@ -349,7 +296,70 @@ defmodule Modbux.Tcp.Client do defp close_socket(state) do :ok = :gen_tcp.close(state.socket) - new_state = %Client{state | socket: nil, status: :closed} - new_state + %Client{state | socket: nil, status: :closed} + end + + defp send_modbus_tcp_request(cmd, state) do + request = Tcp.pack_req(cmd, state.transaction_id) + length = Tcp.res_len(cmd) + + case :gen_tcp.send(state.socket, request) do + :ok -> + new_state = build_successful_request_new_state(cmd, length, state) + + {:ok, new_state} + + {:error, :closed} -> + new_state = close_socket(state) + {{:error, :closed}, new_state} + + {:error, reason} -> + {{:error, reason}, state} + end + end + + defp build_successful_request_new_state(cmd, length, %{active: false} = state), + do: %Client{state | msg_len: length, cmd: cmd} + + defp build_successful_request_new_state(cmd, length, %{active: true} = state) do + pending_msg = Map.put(state.pending_msg, state.transaction_id, cmd) + transaction_id = increase_transaction_id(state.transaction_id) + + %Client{ + state + | msg_len: length, + cmd: cmd, + pending_msg: pending_msg, + transaction_id: transaction_id + } end + + defp increase_transaction_id(0xFFFF), do: 0 + defp increase_transaction_id(transaction_id), do: transaction_id + 1 + + defp receive_modbus_tcp_confirmation(state) do + case :gen_tcp.recv(state.socket, state.msg_len, state.timeout) do + {:ok, response} -> + Logger.debug("(#{__MODULE__}, :confirmation) response: #{inspect(response)}") + + values = Tcp.parse_res(state.cmd, response, state.transaction_id) + + transaction_id = increase_transaction_id(state.transaction_id) + + new_state = %Client{state | transaction_id: transaction_id, cmd: nil, msg_len: 0} + + build_successful_confirmation_new_state(values, new_state) + + {:error, reason} -> + Logger.error("(#{__MODULE__}, :confirmation) reason: #{inspect(reason)}") + new_state = close_socket(state) + new_state = %Client{new_state | cmd: nil, msg_len: 0} + {{:error, reason}, new_state} + end + end + + # Write Request + defp build_successful_confirmation_new_state(nil, new_state), do: {:ok, new_state} + # Read Request + defp build_successful_confirmation_new_state(values, new_state), do: {{:ok, values}, new_state} end From 64c37746fcf36762de168995f1137b687aaacb72 Mon Sep 17 00:00:00 2001 From: alde103 Date: Sun, 11 Aug 2024 22:05:01 -0600 Subject: [PATCH 2/2] TCP Server: packet_format argument added --- lib/tcp/server/server.ex | 66 +++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/lib/tcp/server/server.ex b/lib/tcp/server/server.ex index 4552c54..4be812d 100644 --- a/lib/tcp/server/server.ex +++ b/lib/tcp/server/server.ex @@ -7,14 +7,16 @@ defmodule Modbux.Tcp.Server do use GenServer, restart: :transient require Logger - @port 502 - @to :infinity + @tcp_port 502 + @timeout :infinity + @packet_format :raw defstruct ip: nil, model_pid: nil, tcp_port: nil, - timeout: nil, + timeout: @timeout, listener: nil, + packet_format: @packet_format, parent_pid: nil, sup_pid: nil, acceptor_pid: nil @@ -28,6 +30,7 @@ defmodule Modbux.Tcp.Server do * `timeout` - is the connection timeout. * `model` - defines the DB initial state. * `sup_otps` - server supervisor OTP options. + * `packet_fotmat` - is the :gen_tcp `packet` argument, it accepts 0 | 1 | 2 | 4 | :raw, values (default: :raw). * `active` - (`true` or `false`) enable/disable DB updates notifications (mailbox). The messages (when active mode is true) have the following form: @@ -70,8 +73,8 @@ defmodule Modbux.Tcp.Server do | {:priority, :high | :low | :normal}} | {:timeout, :infinity | non_neg_integer} ]) :: :ignore | {:error, any} | {:ok, pid} - def start_link(params, opts \\ []) do - GenServer.start_link(__MODULE__, {params, self()}, opts) + def start_link(server_parameters, opts \\ []) do + GenServer.start_link(__MODULE__, {server_parameters, self()}, opts) end @spec stop(atom | pid | {atom, any} | {:via, atom, any}) :: :ok @@ -105,24 +108,36 @@ defmodule Modbux.Tcp.Server do GenServer.call(pid, :get_db) end - def init({params, parent_pid}) do - port = Keyword.get(params, :port, @port) - timeout = Keyword.get(params, :timeout, @to) - parent_pid = if Keyword.get(params, :active, false), do: parent_pid - model = Keyword.fetch!(params, :model) - {:ok, model_pid} = Shared.start_link(model: model) - sup_opts = Keyword.get(params, :sup_opts, []) - {:ok, sup_pid} = Server.Supervisor.start_link(sup_opts) - - state = %Server{ - tcp_port: port, - model_pid: model_pid, - timeout: timeout, - parent_pid: parent_pid, - sup_pid: sup_pid - } - - {:ok, state, {:continue, :setup}} + def init({server_parameters, parent_pid}) do + parent_pid = if Keyword.get(server_parameters, :active, false), do: parent_pid + with tcp_port <- Keyword.get(server_parameters, :port, @tcp_port), + true <- is_integer(tcp_port), + timeout <- Keyword.get(server_parameters, :timeout, @timeout), + true <- is_integer(timeout) or timeout == :infinity, + packet_format <- Keyword.get(server_parameters, :packet_format, @packet_format), + true <- packet_format in [0, 1, 2, 4, :raw, :line], + true <- is_pid(parent_pid) or is_nil(parent_pid), + sup_opts <- Keyword.get(server_parameters, :sup_opts, []), + true <- is_list(sup_opts), + model <- Keyword.fetch!(server_parameters, :model), + true <- is_map(model), + true <- Map.keys(model) |> Enum.all?(fn key -> is_integer(key) end) do + {:ok, model_pid} = Shared.start_link(model: model) + {:ok, sup_pid} = Server.Supervisor.start_link(sup_opts) + + state = %Server{ + tcp_port: tcp_port, + model_pid: model_pid, + timeout: timeout, + parent_pid: parent_pid, + sup_pid: sup_pid + } + + {:ok, state, {:continue, :setup}} + else + _ -> + {:stop, :einval} + end end def terminate(:normal, _state), do: nil @@ -160,7 +175,8 @@ defmodule Modbux.Tcp.Server do end defp listener_setup(state) do - case :gen_tcp.listen(state.tcp_port, [:binary, packet: :raw, active: true, reuseaddr: true]) do + listener_opts = [:binary, packet: state.packet_format, active: true, reuseaddr: true] + case :gen_tcp.listen(state.tcp_port, listener_opts) do {:ok, listener} -> {:ok, {ip, _port}} = :inet.sockname(listener) accept = Task.async(fn -> accept(state, listener) end) @@ -181,7 +197,7 @@ defmodule Modbux.Tcp.Server do def close_alive_sockets(port) do Port.list() - |> Enum.filter(fn x -> Port.info(x)[:name] == 'tcp_inet' end) + |> Enum.filter(fn x -> Port.info(x)[:name] == ~c"tcp_inet" end) |> Enum.filter(fn x -> {:ok, {{0, 0, 0, 0}, port}} == :inet.sockname(x) || {:ok, {{127, 0, 0, 1}, port}} == :inet.sockname(x) end)