Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/xandra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ defmodule Xandra do
"""
],
transport_options: [
type: :keyword_list,
type: {:or, [:keyword_list, {:list, :any}]},
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{:list, :any} is a superset of :keyword_list. Maybe we can go with this instead:

Suggested change
type: {:or, [:keyword_list, {:list, :any}]},
type: {:list, {:or, [{:tuple, [:atom, :any]}, :atom]}},

or just go with {:list, :any}.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the :gen_tcp options support additionally the inet_family atoms as well as lists and binaries. I don't have any examples of what those might be, but, if you're okay with it, we could start with {:list, :any}.

option() =
    {active, true | false | once | -32768..32767} |
    ...
    {low_watermark, integer() >= 0} |
    {mode, list | binary} |
    list |
    binary |
    ...

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep sure let's go with tat.

doc: """
Options to forward to the socket transport. If the `:encryption` option is `true`,
then the transport is SSL (see the Erlang `:ssl` module) otherwise it's
Expand Down
6 changes: 5 additions & 1 deletion lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ defmodule Xandra.Cluster.ControlConnection do

{transport_opts, connection_opts} = Keyword.pop(connection_opts, :transport_options, [])

# Construct the transport options.
{keyword_options, other_options} =
Enum.split_with(transport_opts, fn x -> Keyword.keyword?([x]) end)
Comment on lines +105 to +107
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do this. Do:

options: @forced_transport_options ++ transport_opts

as putting the forced options at the beginning overrides later options. Please double check this but I’m pretty sure 🙃

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a conflicting transport option here: {:active, true} which causes the connection to fail.

I tried both flipped configurations to see how ordering affected option precedence, but in both cases the connection failed.

options: @forced_transport_options ++ transport_opts

and

options: transport_opts ++ @forced_transport_options
{:ok, conn} = Xandra.Cluster.start_link(nodes: ["my-scylla-cluster.internal"], transport_options: [:inet6, {:active, false}], authentication: {Xandra.Authenticator.Password, [username: System.get_env("SCYLLA_USERNAME"), password: System.get_env("SCYLLA_PASSWORD")]}, sync_connect: 1000)
IO.inspect(@forced_transport_options ++ transport_opts)
IO.inspect(transport_opts ++ @forced_transport_options)

# [{:packet, :raw}, {:mode, :binary}, {:active, false}, :inet6, {:active, true}]
# [:inet6, {:active, true}, {:packet, :raw}, {:mode, :binary}, {:active, false}]


transport = %Transport{
module: module,
options: Keyword.merge(transport_opts, @forced_transport_options)
options: Keyword.merge(keyword_options, @forced_transport_options) ++ other_options
}

{transport, connection_opts}
Expand Down
13 changes: 9 additions & 4 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,21 @@ defmodule Xandra.Connection do
nil -> data.original_options
end

# Construct the transport options.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, but you can use proplists:

    transport_options = Keyword.get(options, :transport_options, [])
    buffer = :proplists.get_value(transport_options, :buffer, @default_transport_buffer_size)
    transport_options = [buffer: buffer] ++ @forced_transport_options ++ :proplists.delete(:buffer, transport_options)

Should work but please double check 😬

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ran into the same issue as in the other comment where I could override the @forced_transport_options unexpectedly with {:active, true}.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant to both of these comments, I've added tests to check if the forced transport opts are being respected.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erhlee-bird mh I’m confused:

iex> {:ok, socket} = :gen_tcp.connect(~c"google.com", 80, [{:active, true}, {:active, false}])
{:ok, #Port<0.4>}
iex> :inet.getopts(socket, [:active])
{:ok, [active: false]}

Seems like overriding works here?

{keyword_options, other_options} =
options
|> Keyword.get(:transport_options, [])
|> Enum.split_with(fn x -> Keyword.keyword?([x]) end)

# Now, build the state from the options.
{address, port} = Keyword.fetch!(options, :node)

transport = %Transport{
module: if(options[:encryption], do: :ssl, else: :gen_tcp),
options:
options
|> Keyword.get(:transport_options, [])
|> Keyword.put_new(:buffer, @default_transport_buffer_size)
|> Keyword.merge(@forced_transport_options)
(keyword_options
|> Keyword.put_new(:buffer, @default_transport_buffer_size)
|> Keyword.merge(@forced_transport_options)) ++ other_options
}

data = %__MODULE__{
Expand Down
39 changes: 29 additions & 10 deletions lib/xandra/options_validators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,35 @@ defmodule Xandra.OptionsValidators do

@spec validate_node(term()) :: {:ok, {String.t(), integer()}} | {:error, String.t()}
def validate_node(value) when is_binary(value) do
case String.split(value, ":", parts: 2) do
[address, port] ->
case Integer.parse(port) do
{port, ""} when port in 0..65535 -> {:ok, {address, port}}
{port, ""} -> {:error, "invalid port (outside of the 0..65535 range): #{port}"}
_ -> {:error, "invalid node: #{inspect(value)}"}
end

[address] ->
{:ok, {address, 9042}}
# Remove surrounding square brackets from IPv6 values.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this logic, we could "pop" out the port like this, regardless of the type of IP using Regex.split/2 with lookahead:

case Regex.split(~r/:(?=\d+$)/, value) do
  [address, port] ->
    # Same as before

  [address] ->
    # Also same as before
end

This way we don't have to parse the IPv6 address, we can just leave what the user passed in

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing that, but there's ambiguity if you have IPv6 addresses without a port. The first 2 examples below using the Regex method above parse the address incorrectly.

iex(3)> Xandra.OptionsValidators.validate_node("::1")
{:ok, {":", 1}}
iex(4)> Xandra.OptionsValidators.validate_node("2001:db8::1")
{:ok, {"2001:db8:", 1}}
iex(5)> Xandra.OptionsValidators.validate_node("[2001:db8::1]:9042")
{:ok, {"2001:db8::1", 9042}}

Copy link
Owner

@whatyouhide whatyouhide Dec 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works with a lookbehind assertion:

iex(2)> Regex.split(~r/(?<!:):(?=\d+$)/, "::1")
["::1"]
iex(3)> Regex.split(~r/(?<!:):(?=\d+$)/, "2001:db8::1")
["2001:db8::1"]
iex(5)> Regex.split(~r/(?<!:):(?=\d+$)/, "[2001:db8::1]:9042")
["[2001:db8::1]", "9042"]

(need to then address |> String.trim_leading("[") |> String.trim_trailing("]"))

value = value |> String.replace("[", "") |> String.replace("]", "")

{address, port} =
case String.split(value, ":") do
# FQDN or IPv4.
[address] ->
{address, "9042"}

# FQDN:PORT or IPv4:PORT. IPv6 addresses have to have at least 2 colons.
[address, port] ->
{address, port}

# IPv6.
splits ->
# Our IPv6 address can either parse as a valid address or have the
# additional port suffix.
with {:ok, {_, _, _, _, _, _, _, _}} <-
:inet.parse_address(value |> String.to_charlist()) do
{value, "9042"}
else
_ -> {splits |> Enum.drop(-1) |> Enum.join(":"), List.last(splits)}
end
end

case Integer.parse(port) do
{port, ""} when port in 0..65535 -> {:ok, {address, port}}
{port, ""} -> {:error, "invalid port (outside of the 0..65535 range): #{port}"}
_ -> {:error, "invalid node: #{inspect(value)}"}
end
end

Expand Down
23 changes: 23 additions & 0 deletions test/xandra/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ defmodule Xandra.ClusterTest do
Xandra.Cluster.start_link(port: 9042)
end
end

test "with the :inet6 transport option" do
assert {:ok, _conn} = Xandra.Cluster.start_link(nodes: ["::1"], transport_options: [:inet6])

assert {:ok, _conn} =
Xandra.Cluster.start_link(nodes: ["::1:9042"], transport_options: [:inet6])

assert {:ok, _conn} =
Xandra.Cluster.start_link(nodes: ["[::1]"], transport_options: [:inet6])

assert {:ok, _conn} =
Xandra.Cluster.start_link(nodes: ["[::1]:9042"], transport_options: [:inet6])
end
end

describe "start_link/1" do
Expand Down Expand Up @@ -257,6 +270,16 @@ defmodule Xandra.ClusterTest do
assert GenServer.whereis(name) == pid
stop_supervised!(name)
end

@tag telemetry_events: [[:xandra, :connected]]
test "successfully connect even with an invalid transport option",
%{base_options: opts, telemetry_ref: telemetry_ref} do
opts = Keyword.merge(opts, transport_options: [active: true])
_pid = start_link_supervised!({Cluster, opts})

# Assert that we already received events.
assert_received {[:xandra, :connected], ^telemetry_ref, %{}, %{}}
end
end

describe "starting up" do
Expand Down
12 changes: 12 additions & 0 deletions test/xandra/transport_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ defmodule Xandra.TransportTest do
assert %Transport{} = transport = Transport.close(transport)
assert transport.socket == nil
end

test "returns an IPv6-compatible transport" do
assert {:ok, listen_socket} = :gen_tcp.listen(0, [:inet6])
assert {:ok, port} = :inet.port(listen_socket)

transport = %Transport{module: :gen_tcp, options: [:inet6]}
assert {:ok, transport} = Transport.connect(transport, ~c"::1", port, 5000)
assert transport.socket != nil

assert %Transport{} = transport = Transport.close(transport)
assert transport.socket == nil
end
end

describe "is_* macros" do
Expand Down
16 changes: 16 additions & 0 deletions test/xandra_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ defmodule XandraTest do
assert_raise ArgumentError, ~r{the :nodes option can't be an empty list}, fn ->
Xandra.start_link(nodes: [])
end

# IPv6 Connection
assert {:ok, _conn} = Xandra.start_link(nodes: ["127.0.0.1"], transport_options: [:inet6])
end

test "validates the :authentication option" do
Expand Down Expand Up @@ -192,6 +195,19 @@ defmodule XandraTest do

assert_received {[:xandra, :failed_to_connect], ^telemetry_ref, %{}, %{connection: ^conn}}
end

test "invalid transport option gets forcibly overwritten" do
telemetry_ref =
:telemetry_test.attach_event_handlers(self(), [[:xandra, :connected]])

# Set a transport option `active: true` that normally would cause the
# connection to fail. This connection should succeed because start_link
# forcibly overrides and sets some necessary options.
options = Keyword.merge(default_start_options(), transport_options: [active: true])

conn = start_supervised!({Xandra, options})
assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, %{connection: ^conn}}
end
end

describe "execute/3,4" do
Expand Down