Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions lib/leopard/nats_api_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def use(klass, *args, &block)
def run(nats_url:, service_opts:, instances: 1, blocking: true)
logger.info 'Booting NATS API server...'
workers = Concurrent::Array.new
pool = spawn_instances(nats_url, service_opts, instances, workers)
pool = spawn_instances(nats_url, service_opts, instances, workers, blocking)
logger.info 'Setting up signal trap...'
trap_signals(workers, pool)
return pool unless blocking
Expand All @@ -90,16 +90,16 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true)
# @param url [String] The URL of the NATS server.
# @param opts [Hash] Options for the NATS service.
# @param count [Integer] The number of instances to spawn.
# @param workers [Array] The array to store worker instances.
# @param blocking [Boolean] If false, does not block current thread after starting the server.
#
# @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
def spawn_instances(url, opts, count, workers)
def spawn_instances(url, opts, count, workers, blocking)
pool = Concurrent::FixedThreadPool.new(count)
@instance_args = opts.delete(:instance_args) || nil
logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}"
count.times do
eps = endpoints.dup
gps = groups.dup
pool.post { build_worker(url, opts, eps, gps, workers) }
pool.post { build_worker(url, opts, workers, blocking) }
end
pool
end
Expand All @@ -108,15 +108,16 @@ def spawn_instances(url, opts, count, workers)
#
# @param url [String] The URL of the NATS server.
# @param opts [Hash] Options for the NATS service.
# @param eps [Array<Hash>] The list of endpoints to add.
# @param gps [Hash] The groups to add.
# @param workers [Array] The array to store worker instances.
# @param blocking [Boolean] If true, blocks the current thread until the worker is set up.
#
# @return [void]
def build_worker(url, opts, eps, gps, workers)
def build_worker(url, opts, workers, blocking)
worker = @instance_args ? new(*@instance_args) : new
workers << worker
worker.setup_worker(url, opts, eps, gps)
return worker.setup_worker!(nats_url: url, service_opts: opts) if blocking

worker.setup_worker(nats_url: url, service_opts: opts)
Comment on lines +118 to +120
Copy link

Copilot AI Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditional logic is inverted. When blocking is true, the method calls setup_worker! which blocks with sleep, but then immediately returns, preventing the sleep from executing. This should be unless blocking or the logic should be restructured.

Suggested change
return worker.setup_worker!(nats_url: url, service_opts: opts) if blocking
worker.setup_worker(nats_url: url, service_opts: opts)
if blocking
worker.setup_worker!(nats_url: url, service_opts: opts)
else
worker.setup_worker(nats_url: url, service_opts: opts)
end

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it got this wrong. Not sure why it thinks the sleep is prevented from executing

end

# Shuts down the NATS API server gracefully.
Expand Down Expand Up @@ -174,20 +175,28 @@ def logger = self.class.logger

# Sets up a worker thread for the NATS API server.
# This method connects to the NATS server, adds the service, groups, and endpoints,
# and keeps the worker thread alive.
#
# @param url [String] The URL of the NATS server.
# @param opts [Hash] Options for the NATS service.
# @param eps [Array<Hash>] The list of endpoints to add.
# @param gps [Hash] The groups to add.
#
# @return [void]
def setup_worker(url, opts, eps, gps)
def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {})
@thread = Thread.current
@client = NATS.connect url
@service = @client.services.add(**opts)
@client = NATS.connect nats_url
@service = @client.services.add(build_service_opts(service_opts:))
Copy link

Copilot AI Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build_service_opts method call uses named parameter syntax but the method definition shows service_opts: parameter. This should be build_service_opts(service_opts: service_opts) for clarity.

Suggested change
@service = @client.services.add(build_service_opts(service_opts:))
@service = @client.services.add(build_service_opts(service_opts: service_opts))

Copilot uses AI. Check for mistakes.
gps = self.class.groups.dup
eps = self.class.endpoints.dup
group_map = add_groups(gps)
add_endpoints eps, group_map
end

# Sets up a worker thread for the NATS API server and blocks the current thread.
#
# @see #setup_worker
def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {})
setup_worker(nats_url:, service_opts:)
sleep
end

Expand All @@ -202,6 +211,18 @@ def stop

private

# Builds the service options for the NATS service.
#
# @param service_opts [Hash] Options for the NATS service.
#
# @return [Hash] The complete service options including name and version.
def build_service_opts(service_opts:)
{
name: self.class.name.split('::').join('.'),
version: '0.1.0',
}.merge(service_opts)
end

# Adds groups to the NATS service.
#
# @param gps [Hash] The groups to add, where keys are group names and values are group definitions.
Expand Down