Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion lib/mix/tasks/bench.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ defmodule Mix.Tasks.Bench do
* `--partition-count` - Number of partitions (default: schedulers_online)
* `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration)
* `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps" (default: "full")
* `--tprof` - Enable tprof call_time profiling of Sequin.* modules (default: false)
* `--tprof-limit` - Number of top functions to show in tprof report (default: 60)

## Examples

Expand All @@ -37,6 +39,9 @@ defmodule Mix.Tasks.Bench do

# Run through SlotProcessorServer (includes message handler, stops before Broadway)
mix benchmark --through sps

# Run with tprof profiling to see which functions dominate wall time
mix benchmark --duration 30 --tprof
"""
use Mix.Task

Expand Down Expand Up @@ -80,7 +85,9 @@ defmodule Mix.Tasks.Bench do
pk_collision_rate: :float,
partition_count: :integer,
max_messages: :integer,
through: :string
through: :string,
tprof: :boolean,
tprof_limit: :integer
]
)

Expand All @@ -91,6 +98,8 @@ defmodule Mix.Tasks.Bench do
partition_count = Keyword.get(opts, :partition_count, @default_partition_count)
max_messages = Keyword.get(opts, :max_messages)
through = opts |> Keyword.get(:through, "full") |> String.to_existing_atom()
tprof? = Keyword.get(opts, :tprof, false)
tprof_limit = Keyword.get(opts, :tprof_limit, 60)

if max_messages && duration_opt do
Mix.raise("--duration and --max-messages are mutually exclusive")
Expand All @@ -106,6 +115,11 @@ defmodule Mix.Tasks.Bench do
# Start the application
Mix.Task.run("app.start")

# Start tprof if requested
if tprof? do
start_tprof()
end

announce("#{@bold}=== Sequin Pipeline Benchmark ===#{@reset}", @cyan)
IO.puts("")

Expand All @@ -117,6 +131,7 @@ defmodule Mix.Tasks.Bench do
IO.puts(" Partition count: #{partition_count}")
IO.puts(" Max messages: #{max_messages || "unlimited"}")
IO.puts(" Through: #{through}")
IO.puts(" tprof: #{tprof?}")
IO.puts("")

# Setup replication slot
Expand Down Expand Up @@ -321,10 +336,76 @@ defmodule Mix.Tasks.Bench do
pipeline_tracked
)

# Print tprof report if enabled
if tprof? do
stop_and_report_tprof(tprof_limit)
end

# Cleanup
cleanup_entities(consumer, replication)
end

defp start_tprof do
:tprof.start(%{type: :call_time})
:tprof.enable_trace(:all)

for {mod, _} <- :code.all_loaded(),
mod_str = Atom.to_string(mod),
String.starts_with?(mod_str, "Elixir.Sequin.") do
:tprof.set_pattern(mod, :_, :_)
end

announce("tprof profiling enabled (tracing Sequin.* modules)", @yellow)
end

defp stop_and_report_tprof(limit) do
:tprof.disable_trace(:all)
{:call_time, raw} = :tprof.collect()
:tprof.stop()

grand_total =
raw
|> Enum.map(fn {_, _, _, pid_data} ->
pid_data |> Enum.map(fn {_, _, time} -> time end) |> Enum.sum()
end)
|> Enum.sum()

rows =
raw
|> Enum.map(fn {mod, fun, arity, pid_data} ->
total_calls = pid_data |> Enum.map(fn {_, calls, _} -> calls end) |> Enum.sum()
total_time = pid_data |> Enum.map(fn {_, _, time} -> time end) |> Enum.sum()
{mod, fun, arity, total_calls, total_time}
end)
|> Enum.sort_by(fn {_, _, _, _, time} -> time end, :desc)
|> Enum.take(limit)

IO.puts("")
announce("#{@bold}tprof Call Time Profile (top #{limit} functions):#{@reset}", @cyan)
IO.puts("")

header =
" #{String.pad_trailing("FUNCTION", 70)} #{String.pad_leading("CALLS", 12)} #{String.pad_leading("TIME (ms)", 12)} #{String.pad_leading("% TOTAL", 9)}"

IO.puts(header)
IO.puts(" #{String.duplicate("-", 105)}")

Enum.each(rows, fn {mod, fun, arity, calls, time_us} ->
mfa = "#{inspect(mod)}.#{fun}/#{arity}"
time_ms = Float.round(time_us / 1000, 1)
pct = if grand_total > 0, do: Float.round(time_us / grand_total * 100, 1), else: 0.0

IO.puts(
" #{String.pad_trailing(mfa, 70)} #{String.pad_leading(format_number(calls), 12)} #{String.pad_leading(:erlang.float_to_binary(time_ms, decimals: 1), 12)} #{String.pad_leading(:erlang.float_to_binary(pct, decimals: 1) <> "%", 9)}"
)
end)

IO.puts("")
total_ms = Float.round(grand_total / 1000, 1)
IO.puts(" Total traced time: #{total_ms}ms")
IO.puts("")
end

defp cleanup_entities(_consumer, replication) do
{:ok, database} = Databases.get_db(replication.postgres_database_id)
{:ok, account} = Accounts.get_account(database.account_id)
Expand Down
Loading