From 04bb6b179e301b9622849f3665174c3a5a07f1af Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 13:31:41 -0500 Subject: [PATCH 1/2] fix: Allows non-blocking setup_worker method --- lib/leopard/nats_api_server.rb | 36 +++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index f8a54d6..07979a2 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -97,9 +97,7 @@ def spawn_instances(url, opts, count, workers) @instance_args = opts.delete(:instance_args) || nil logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" count.times do - eps = endpoints.dup - gps = groups.dup - pool.post { build_worker(url, opts, eps, gps, workers) } + pool.post { build_worker(url, opts, workers) } end pool end @@ -113,10 +111,10 @@ def spawn_instances(url, opts, count, workers) # @param workers [Array] The array to store worker instances. # # @return [void] - def build_worker(url, opts, eps, gps, workers) + def build_worker(url, opts, workers) worker = @instance_args ? new(*@instance_args) : new workers << worker - worker.setup_worker(url, opts, eps, gps) + worker.setup_worker!(nats_url: url, service_opts: opts) end # Shuts down the NATS API server gracefully. @@ -174,7 +172,6 @@ def logger = self.class.logger # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, - # and keeps the worker thread alive. # # @param url [String] The URL of the NATS server. # @param opts [Hash] Options for the NATS service. @@ -182,12 +179,21 @@ def logger = self.class.logger # @param gps [Hash] The groups to add. # # @return [void] - def setup_worker(url, opts, eps, gps) + def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) @thread = Thread.current - @client = NATS.connect url - @service = @client.services.add(**opts) + @client = NATS.connect nats_url + @service = @client.services.add(build_service_opts(service_opts:)) + gps = self.class.groups.dup + eps = self.class.endpoints.dup group_map = add_groups(gps) add_endpoints eps, group_map + end + + # Sets up a worker thread for the NATS API server and blocks the current thread. + # + # @see #setup_worker + def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) + setup_worker(nats_url:, service_opts:) sleep end @@ -202,6 +208,18 @@ def stop private + # Builds the service options for the NATS service. + # + # @param service_opts [Hash] Options for the NATS service. + # + # @return [Hash] The complete service options including name and version. + def build_service_opts(service_opts:) + { + name: self.class.name.split('::').join('.'), + version: '0.1.0', + }.merge(service_opts) + end + # Adds groups to the NATS service. # # @param gps [Hash] The groups to add, where keys are group names and values are group definitions. From 237f26cb074d85a607f5e4e97deee1834f587d59 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 13:41:01 -0500 Subject: [PATCH 2/2] fix: Ensure we do not block in non-blocking mode --- lib/leopard/nats_api_server.rb | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 07979a2..582ccdc 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -75,7 +75,7 @@ def use(klass, *args, &block) def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' workers = Concurrent::Array.new - pool = spawn_instances(nats_url, service_opts, instances, workers) + pool = spawn_instances(nats_url, service_opts, instances, workers, blocking) logger.info 'Setting up signal trap...' trap_signals(workers, pool) return pool unless blocking @@ -90,14 +90,16 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @param url [String] The URL of the NATS server. # @param opts [Hash] Options for the NATS service. # @param count [Integer] The number of instances to spawn. + # @param workers [Array] The array to store worker instances. + # @param blocking [Boolean] If false, does not block current thread after starting the server. # # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. - def spawn_instances(url, opts, count, workers) + def spawn_instances(url, opts, count, workers, blocking) pool = Concurrent::FixedThreadPool.new(count) @instance_args = opts.delete(:instance_args) || nil logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" count.times do - pool.post { build_worker(url, opts, workers) } + pool.post { build_worker(url, opts, workers, blocking) } end pool end @@ -106,15 +108,16 @@ def spawn_instances(url, opts, count, workers) # # @param url [String] The URL of the NATS server. # @param opts [Hash] Options for the NATS service. - # @param eps [Array] The list of endpoints to add. - # @param gps [Hash] The groups to add. # @param workers [Array] The array to store worker instances. + # @param blocking [Boolean] If true, blocks the current thread until the worker is set up. # # @return [void] - def build_worker(url, opts, workers) + def build_worker(url, opts, workers, blocking) worker = @instance_args ? new(*@instance_args) : new workers << worker - worker.setup_worker!(nats_url: url, service_opts: opts) + return worker.setup_worker!(nats_url: url, service_opts: opts) if blocking + + worker.setup_worker(nats_url: url, service_opts: opts) end # Shuts down the NATS API server gracefully.