Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions lib/mix/tasks/bench.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Mix.Tasks.Bench do
* `--partition-count` - Number of partitions (default: schedulers_online)
* `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration)
* `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps" (default: "full")
* `--profile` - Enable per-stage pipeline profiling (default: false)
* `--tprof` - Enable tprof call_time profiling of Sequin.* modules (default: false)
* `--tprof-limit` - Number of top functions to show in tprof report (default: 60)

Expand All @@ -40,13 +41,17 @@ defmodule Mix.Tasks.Bench do
# Run through SlotProcessorServer (includes message handler, stops before Broadway)
mix benchmark --through sps

# Run with per-stage pipeline profiling
mix benchmark --duration 30 --profile

# Run with tprof profiling to see which functions dominate wall time
mix benchmark --duration 30 --tprof
"""
use Mix.Task

alias Sequin.Accounts
alias Sequin.Benchmark.MessageHandler, as: BenchmarkMessageHandler
alias Sequin.Benchmark.Profiler
alias Sequin.Benchmark.Stats
alias Sequin.Consumers
alias Sequin.Databases
Expand Down Expand Up @@ -86,6 +91,7 @@ defmodule Mix.Tasks.Bench do
partition_count: :integer,
max_messages: :integer,
through: :string,
profile: :boolean,
tprof: :boolean,
tprof_limit: :integer
]
Expand All @@ -98,6 +104,7 @@ defmodule Mix.Tasks.Bench do
partition_count = Keyword.get(opts, :partition_count, @default_partition_count)
max_messages = Keyword.get(opts, :max_messages)
through = opts |> Keyword.get(:through, "full") |> String.to_existing_atom()
profile? = Keyword.get(opts, :profile, false)
tprof? = Keyword.get(opts, :tprof, false)
tprof_limit = Keyword.get(opts, :tprof_limit, 60)

Expand All @@ -115,6 +122,9 @@ defmodule Mix.Tasks.Bench do
# Start the application
Mix.Task.run("app.start")

# Initialize profiler if requested
if profile?, do: Profiler.init()

# Start tprof if requested
if tprof? do
start_tprof()
Expand All @@ -131,6 +141,7 @@ defmodule Mix.Tasks.Bench do
IO.puts(" Partition count: #{partition_count}")
IO.puts(" Max messages: #{max_messages || "unlimited"}")
IO.puts(" Through: #{through}")
IO.puts(" profile: #{profile?}")
IO.puts(" tprof: #{tprof?}")
IO.puts("")

Expand Down Expand Up @@ -336,12 +347,22 @@ defmodule Mix.Tasks.Bench do
pipeline_tracked
)

# Print profiling report
if profile? do
report = Profiler.report()

if Enum.any?(report) do
Profiler.format_report(report)
end
end

# Print tprof report if enabled
if tprof? do
stop_and_report_tprof(tprof_limit)
end

# Cleanup
if profile?, do: Profiler.cleanup()
cleanup_entities(consumer, replication)
end

Expand Down Expand Up @@ -529,6 +550,11 @@ defmodule Mix.Tasks.Bench do
byte_size: msg.byte_size,
created_at_us: extract_created_at(msg.message.fields)
})

if Profiler.enabled?() do
Profiler.checkpoint(msg.message.commit_lsn, msg.message.commit_idx, :sink_in)
Profiler.finalize_message(msg.message.commit_lsn, msg.message.commit_idx)
end
end)
end)

Expand Down
8 changes: 8 additions & 0 deletions lib/sequin/benchmark/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Sequin.Benchmark.MessageHandler do

@behaviour Sequin.Runtime.MessageHandler

alias Sequin.Benchmark.Profiler
alias Sequin.Benchmark.Stats
alias Sequin.Runtime.SlotProcessor.Message

Expand Down Expand Up @@ -49,6 +50,13 @@ defmodule Sequin.Benchmark.MessageHandler do
end)
end)

if Profiler.enabled?() do
Enum.each(messages, fn %Message{} = msg ->
Profiler.checkpoint(msg.commit_lsn, msg.commit_idx, :sink_in)
Profiler.finalize_message(msg.commit_lsn, msg.commit_idx)
end)
end

{:ok, length(messages)}
end

Expand Down
Loading
Loading