From 8b9d72ac00c515f81c38b53d2b0167dcd7245837 Mon Sep 17 00:00:00 2001 From: RTLS Date: Wed, 18 Feb 2026 23:02:37 -0800 Subject: [PATCH] =?UTF-8?q?=C2=B5=20add=20:tprof=20to=20bench?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/mix/tasks/bench.ex | 83 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/lib/mix/tasks/bench.ex b/lib/mix/tasks/bench.ex index a0037f48a..e7765d370 100644 --- a/lib/mix/tasks/bench.ex +++ b/lib/mix/tasks/bench.ex @@ -17,6 +17,8 @@ defmodule Mix.Tasks.Bench do * `--partition-count` - Number of partitions (default: schedulers_online) * `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration) * `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps" (default: "full") + * `--tprof` - Enable tprof call_time profiling of Sequin.* modules (default: false) + * `--tprof-limit` - Number of top functions to show in tprof report (default: 60) ## Examples @@ -37,6 +39,9 @@ defmodule Mix.Tasks.Bench do # Run through SlotProcessorServer (includes message handler, stops before Broadway) mix benchmark --through sps + + # Run with tprof profiling to see which functions dominate wall time + mix benchmark --duration 30 --tprof """ use Mix.Task @@ -80,7 +85,9 @@ defmodule Mix.Tasks.Bench do pk_collision_rate: :float, partition_count: :integer, max_messages: :integer, - through: :string + through: :string, + tprof: :boolean, + tprof_limit: :integer ] ) @@ -91,6 +98,8 @@ defmodule Mix.Tasks.Bench do partition_count = Keyword.get(opts, :partition_count, @default_partition_count) max_messages = Keyword.get(opts, :max_messages) through = opts |> Keyword.get(:through, "full") |> String.to_existing_atom() + tprof? = Keyword.get(opts, :tprof, false) + tprof_limit = Keyword.get(opts, :tprof_limit, 60) if max_messages && duration_opt do Mix.raise("--duration and --max-messages are mutually exclusive") @@ -106,6 +115,11 @@ defmodule Mix.Tasks.Bench do # Start the application Mix.Task.run("app.start") + # Start tprof if requested + if tprof? do + start_tprof() + end + announce("#{@bold}=== Sequin Pipeline Benchmark ===#{@reset}", @cyan) IO.puts("") @@ -117,6 +131,7 @@ defmodule Mix.Tasks.Bench do IO.puts(" Partition count: #{partition_count}") IO.puts(" Max messages: #{max_messages || "unlimited"}") IO.puts(" Through: #{through}") + IO.puts(" tprof: #{tprof?}") IO.puts("") # Setup replication slot @@ -321,10 +336,76 @@ defmodule Mix.Tasks.Bench do pipeline_tracked ) + # Print tprof report if enabled + if tprof? do + stop_and_report_tprof(tprof_limit) + end + # Cleanup cleanup_entities(consumer, replication) end + defp start_tprof do + :tprof.start(%{type: :call_time}) + :tprof.enable_trace(:all) + + for {mod, _} <- :code.all_loaded(), + mod_str = Atom.to_string(mod), + String.starts_with?(mod_str, "Elixir.Sequin.") do + :tprof.set_pattern(mod, :_, :_) + end + + announce("tprof profiling enabled (tracing Sequin.* modules)", @yellow) + end + + defp stop_and_report_tprof(limit) do + :tprof.disable_trace(:all) + {:call_time, raw} = :tprof.collect() + :tprof.stop() + + grand_total = + raw + |> Enum.map(fn {_, _, _, pid_data} -> + pid_data |> Enum.map(fn {_, _, time} -> time end) |> Enum.sum() + end) + |> Enum.sum() + + rows = + raw + |> Enum.map(fn {mod, fun, arity, pid_data} -> + total_calls = pid_data |> Enum.map(fn {_, calls, _} -> calls end) |> Enum.sum() + total_time = pid_data |> Enum.map(fn {_, _, time} -> time end) |> Enum.sum() + {mod, fun, arity, total_calls, total_time} + end) + |> Enum.sort_by(fn {_, _, _, _, time} -> time end, :desc) + |> Enum.take(limit) + + IO.puts("") + announce("#{@bold}tprof Call Time Profile (top #{limit} functions):#{@reset}", @cyan) + IO.puts("") + + header = + " #{String.pad_trailing("FUNCTION", 70)} #{String.pad_leading("CALLS", 12)} #{String.pad_leading("TIME (ms)", 12)} #{String.pad_leading("% TOTAL", 9)}" + + IO.puts(header) + IO.puts(" #{String.duplicate("-", 105)}") + + Enum.each(rows, fn {mod, fun, arity, calls, time_us} -> + mfa = "#{inspect(mod)}.#{fun}/#{arity}" + time_ms = Float.round(time_us / 1000, 1) + pct = if grand_total > 0, do: Float.round(time_us / grand_total * 100, 1), else: 0.0 + + IO.puts( + " #{String.pad_trailing(mfa, 70)} #{String.pad_leading(format_number(calls), 12)} #{String.pad_leading(:erlang.float_to_binary(time_ms, decimals: 1), 12)} #{String.pad_leading(:erlang.float_to_binary(pct, decimals: 1) <> "%", 9)}" + ) + end) + + IO.puts("") + total_ms = Float.round(grand_total / 1000, 1) + IO.puts(" Total traced time: #{total_ms}ms") + IO.puts("") + end + defp cleanup_entities(_consumer, replication) do {:ok, database} = Databases.get_db(replication.postgres_database_id) {:ok, account} = Accounts.get_account(database.account_id)