diff --git a/lib/xandra.ex b/lib/xandra.ex index a4f0087e..600c8199 100644 --- a/lib/xandra.ex +++ b/lib/xandra.ex @@ -444,7 +444,7 @@ defmodule Xandra do """ ], transport_options: [ - type: :keyword_list, + type: {:or, [:keyword_list, {:list, :any}]}, doc: """ Options to forward to the socket transport. If the `:encryption` option is `true`, then the transport is SSL (see the Erlang `:ssl` module) otherwise it's diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 6ba50319..8bf940c4 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -102,9 +102,13 @@ defmodule Xandra.Cluster.ControlConnection do {transport_opts, connection_opts} = Keyword.pop(connection_opts, :transport_options, []) + # Construct the transport options. + {keyword_options, other_options} = + Enum.split_with(transport_opts, fn x -> Keyword.keyword?([x]) end) + transport = %Transport{ module: module, - options: Keyword.merge(transport_opts, @forced_transport_options) + options: Keyword.merge(keyword_options, @forced_transport_options) ++ other_options } {transport, connection_opts} diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 86072a37..a968ac23 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -409,16 +409,21 @@ defmodule Xandra.Connection do nil -> data.original_options end + # Construct the transport options. + {keyword_options, other_options} = + options + |> Keyword.get(:transport_options, []) + |> Enum.split_with(fn x -> Keyword.keyword?([x]) end) + # Now, build the state from the options. {address, port} = Keyword.fetch!(options, :node) transport = %Transport{ module: if(options[:encryption], do: :ssl, else: :gen_tcp), options: - options - |> Keyword.get(:transport_options, []) - |> Keyword.put_new(:buffer, @default_transport_buffer_size) - |> Keyword.merge(@forced_transport_options) + (keyword_options + |> Keyword.put_new(:buffer, @default_transport_buffer_size) + |> Keyword.merge(@forced_transport_options)) ++ other_options } data = %__MODULE__{ diff --git a/lib/xandra/options_validators.ex b/lib/xandra/options_validators.ex index 31b9ec7d..2aa743bb 100644 --- a/lib/xandra/options_validators.ex +++ b/lib/xandra/options_validators.ex @@ -30,16 +30,35 @@ defmodule Xandra.OptionsValidators do @spec validate_node(term()) :: {:ok, {String.t(), integer()}} | {:error, String.t()} def validate_node(value) when is_binary(value) do - case String.split(value, ":", parts: 2) do - [address, port] -> - case Integer.parse(port) do - {port, ""} when port in 0..65535 -> {:ok, {address, port}} - {port, ""} -> {:error, "invalid port (outside of the 0..65535 range): #{port}"} - _ -> {:error, "invalid node: #{inspect(value)}"} - end - - [address] -> - {:ok, {address, 9042}} + # Remove surrounding square brackets from IPv6 values. + value = value |> String.replace("[", "") |> String.replace("]", "") + + {address, port} = + case String.split(value, ":") do + # FQDN or IPv4. + [address] -> + {address, "9042"} + + # FQDN:PORT or IPv4:PORT. IPv6 addresses have to have at least 2 colons. + [address, port] -> + {address, port} + + # IPv6. + splits -> + # Our IPv6 address can either parse as a valid address or have the + # additional port suffix. + with {:ok, {_, _, _, _, _, _, _, _}} <- + :inet.parse_address(value |> String.to_charlist()) do + {value, "9042"} + else + _ -> {splits |> Enum.drop(-1) |> Enum.join(":"), List.last(splits)} + end + end + + case Integer.parse(port) do + {port, ""} when port in 0..65535 -> {:ok, {address, port}} + {port, ""} -> {:error, "invalid port (outside of the 0..65535 range): #{port}"} + _ -> {:error, "invalid node: #{inspect(value)}"} end end diff --git a/test/xandra/cluster_test.exs b/test/xandra/cluster_test.exs index 47463ed4..453e3f39 100644 --- a/test/xandra/cluster_test.exs +++ b/test/xandra/cluster_test.exs @@ -147,6 +147,19 @@ defmodule Xandra.ClusterTest do Xandra.Cluster.start_link(port: 9042) end end + + test "with the :inet6 transport option" do + assert {:ok, _conn} = Xandra.Cluster.start_link(nodes: ["::1"], transport_options: [:inet6]) + + assert {:ok, _conn} = + Xandra.Cluster.start_link(nodes: ["::1:9042"], transport_options: [:inet6]) + + assert {:ok, _conn} = + Xandra.Cluster.start_link(nodes: ["[::1]"], transport_options: [:inet6]) + + assert {:ok, _conn} = + Xandra.Cluster.start_link(nodes: ["[::1]:9042"], transport_options: [:inet6]) + end end describe "start_link/1" do @@ -257,6 +270,16 @@ defmodule Xandra.ClusterTest do assert GenServer.whereis(name) == pid stop_supervised!(name) end + + @tag telemetry_events: [[:xandra, :connected]] + test "successfully connect even with an invalid transport option", + %{base_options: opts, telemetry_ref: telemetry_ref} do + opts = Keyword.merge(opts, transport_options: [active: true]) + _pid = start_link_supervised!({Cluster, opts}) + + # Assert that we already received events. + assert_received {[:xandra, :connected], ^telemetry_ref, %{}, %{}} + end end describe "starting up" do diff --git a/test/xandra/transport_test.exs b/test/xandra/transport_test.exs index 98278563..798c23c8 100644 --- a/test/xandra/transport_test.exs +++ b/test/xandra/transport_test.exs @@ -17,6 +17,18 @@ defmodule Xandra.TransportTest do assert %Transport{} = transport = Transport.close(transport) assert transport.socket == nil end + + test "returns an IPv6-compatible transport" do + assert {:ok, listen_socket} = :gen_tcp.listen(0, [:inet6]) + assert {:ok, port} = :inet.port(listen_socket) + + transport = %Transport{module: :gen_tcp, options: [:inet6]} + assert {:ok, transport} = Transport.connect(transport, ~c"::1", port, 5000) + assert transport.socket != nil + + assert %Transport{} = transport = Transport.close(transport) + assert transport.socket == nil + end end describe "is_* macros" do diff --git a/test/xandra_test.exs b/test/xandra_test.exs index 854e76c9..f2f4eb51 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -33,6 +33,9 @@ defmodule XandraTest do assert_raise ArgumentError, ~r{the :nodes option can't be an empty list}, fn -> Xandra.start_link(nodes: []) end + + # IPv6 Connection + assert {:ok, _conn} = Xandra.start_link(nodes: ["127.0.0.1"], transport_options: [:inet6]) end test "validates the :authentication option" do @@ -192,6 +195,19 @@ defmodule XandraTest do assert_received {[:xandra, :failed_to_connect], ^telemetry_ref, %{}, %{connection: ^conn}} end + + test "invalid transport option gets forcibly overwritten" do + telemetry_ref = + :telemetry_test.attach_event_handlers(self(), [[:xandra, :connected]]) + + # Set a transport option `active: true` that normally would cause the + # connection to fail. This connection should succeed because start_link + # forcibly overrides and sets some necessary options. + options = Keyword.merge(default_start_options(), transport_options: [active: true]) + + conn = start_supervised!({Xandra, options}) + assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, %{connection: ^conn}} + end end describe "execute/3,4" do