From 52115b48d69251586db9d0b3eda977c2d1a1fcd0 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Fri, 1 Aug 2025 14:04:26 -0500 Subject: [PATCH 01/10] refactor: Lowers AbcSize for building workers --- Gemfile.lock | 2 +- lib/leopard/nats_api_server.rb | 38 ++++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index bd17c90..fee305e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - leopard (0.1.0) + leopard (0.1.5) concurrent-ruby (~> 1.1) dry-configurable (~> 1.3) dry-monads (~> 1.9) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 0d74088..b4b9648 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -18,6 +18,8 @@ def self.included(base) base.include(SemanticLogger::Loggable) end + Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) + module ClassMethods def endpoints = @endpoints ||= [] def groups = @groups ||= {} @@ -33,13 +35,7 @@ def middleware = @middleware ||= [] # # @return [void] def endpoint(name, subject: nil, queue: nil, group: nil, &handler) - endpoints << { - name:, - subject: subject || name, - queue:, - group:, - handler:, - } + endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:) end # Define a group for organizing endpoints. @@ -160,15 +156,25 @@ def build_group(service, defs, cache, name) # @return [void] def add_endpoints(service, endpoints, group_map) endpoints.each do |ep| - parent = ep[:group] ? group_map[ep[:group]] : service - raise ArgumentError, "Group #{ep[:group]} not defined" if ep[:group] && parent.nil? - - parent.endpoints.add( - ep[:name], subject: ep[:subject], queue: ep[:queue] - ) do |raw_msg| - wrapper = MessageWrapper.new(raw_msg) - dispatch_with_middleware(wrapper, ep[:handler]) - end + grp = ep[:grp] + parent = grp ? group_map[grp] : service + raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil? + + build_endpoint(parent, ep) + end + end + + # Builds an endpoint in the NATS service. + # + # @param parent [NATS::Group] The parent group or service to add the endpoint to. + # @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler. + # NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion. + # + # @return [void] + def build_endpoint(parent, ept) + parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| + wrapper = MessageWrapper.new(raw_msg) + dispatch_with_middleware(wrapper, ept.handler) end end From d457604ccec9f5d94d009dc7417f74844459590f Mon Sep 17 00:00:00 2001 From: Bryce Rich Date: Fri, 1 Aug 2025 17:07:48 -0600 Subject: [PATCH 02/10] chore: move instance methods to an instance method module and update impacted tests --- lib/leopard/nats_api_server.rb | 39 +++++++++++-------- test/lib/nats_api_server.rb | 69 +++++++++++++++++++++++++--------- 2 files changed, 75 insertions(+), 33 deletions(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index b4b9648..cf3d28f 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -14,6 +14,7 @@ module NatsApiServer def self.included(base) base.extend(ClassMethods) + base.include(InstanceMethods) base.extend(Dry::Monads[:result]) base.include(SemanticLogger::Loggable) end @@ -98,6 +99,14 @@ def spawn_instances(url, opts, count) pool end + def setup_worker(url, opts, eps, gps) + # Create an instance of the NATS client to act as a worker thread + worker = new + worker.setup_worker(url, opts, eps, gps) + end + end + + module InstanceMethods # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # and keeps the worker thread alive. @@ -109,55 +118,54 @@ def spawn_instances(url, opts, count) # # @return [void] def setup_worker(url, opts, eps, gps) - client = NATS.connect url - service = client.services.add(**opts) - group_map = add_groups(service, gps) - add_endpoints service, eps, group_map + @client = NATS.connect url + @service = @client.services.add(**opts) + group_map = add_groups(gps) + add_endpoints eps, group_map # Keep the worker thread alive sleep end + private + # Adds groups to the NATS service. # - # @param service [NATS::Service] The NATS service to add groups to. # @param gps [Hash] The groups to add, where keys are group names and values are group definitions. # # @return [Hash] A map of group names to their created group objects. - def add_groups(service, gps) + def add_groups(gps) created = {} - gps.each_key { |name| build_group(service, gps, created, name) } + gps.each_key { |name| build_group(gps, created, name) } created end # Builds a group in the NATS service. # - # @param service [NATS::Service] The NATS service to add the group to. # @param defs [Hash] The group definitions, where keys are group names and values are group definitions. # @param cache [Hash] A cache to store already created groups. # @param name [String] The name of the group to build. # # @return [NATS::Group] The created group object. - def build_group(service, defs, cache, name) + def build_group(defs, cache, name) return cache[name] if cache.key?(name) gdef = defs[name] raise ArgumentError, "Group #{name} not defined" unless gdef - parent = gdef[:parent] ? build_group(service, defs, cache, gdef[:parent]) : service + parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue]) end # Adds endpoints to the NATS service. # - # @param service [NATS::Service] The NATS service to add endpoints to. # @param endpoints [Array] The list of endpoints to add. # @param group_map [Hash] A map of group names to their created group objects. # # @return [void] - def add_endpoints(service, endpoints, group_map) + def add_endpoints(endpoints, group_map) endpoints.each do |ep| - grp = ep[:grp] - parent = grp ? group_map[grp] : service + grp = ep.group + parent = grp ? group_map[grp] : @service raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil? build_endpoint(parent, ep) @@ -186,7 +194,7 @@ def build_endpoint(parent, ept) # @return [void] def dispatch_with_middleware(wrapper, handler) app = ->(w) { handle_message(w.raw, handler) } - middleware.reverse_each do |(klass, args, blk)| + self.class.middleware.reverse_each do |(klass, args, blk)| app = klass.new(app, *args, &blk) end app.call(wrapper) @@ -209,7 +217,6 @@ def handle_message(raw_msg, handler) # Processes the result of the handler execution. # - # # @param wrapper [MessageWrapper] The message wrapper containing the raw message. # @param result [Dry::Monads::Result] The result of the handler execution. # diff --git a/test/lib/nats_api_server.rb b/test/lib/nats_api_server.rb index 21ef6ca..1fc732b 100755 --- a/test/lib/nats_api_server.rb +++ b/test/lib/nats_api_server.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'helper' +require_relative '../helper' require Rubyists::Leopard.libroot / 'leopard/nats_api_server' describe 'Rubyists::Leopard::NatsApiServer' do # rubocop:disable Metrics/BlockLength @@ -9,6 +9,9 @@ include Rubyists::Leopard::NatsApiServer end + # Create an instance of the class to test instance methods + @instance = @klass.new + mod = Rubyists::Leopard::NatsApiServer cm = mod::ClassMethods cm.const_set(:Success, mod::Success) unless cm.const_defined?(:Success) @@ -19,16 +22,28 @@ blk = proc {} @klass.endpoint(:foo, &blk) - assert_equal [{ name: :foo, subject: :foo, queue: nil, group: nil, handler: blk }], - @klass.endpoints + assert_equal 1, @klass.endpoints.length + endpoint = @klass.endpoints.first + + assert_equal :foo, endpoint.name + assert_equal :foo, endpoint.subject + assert_nil endpoint.queue + assert_nil endpoint.group + assert_equal blk, endpoint.handler end it 'registers an endpoint with options' do blk = proc {} @klass.endpoint(:foo, subject: 'bar', queue: 'q', &blk) - assert_equal [{ name: :foo, subject: 'bar', queue: 'q', group: nil, handler: blk }], - @klass.endpoints + assert_equal 1, @klass.endpoints.length + endpoint = @klass.endpoints.first + + assert_equal :foo, endpoint.name + assert_equal 'bar', endpoint.subject + assert_equal 'q', endpoint.queue + assert_nil endpoint.group + assert_equal blk, endpoint.handler end it 'registers a group' do @@ -42,8 +57,14 @@ @klass.group :math @klass.endpoint(:add, group: :math, &blk) - assert_equal [{ name: :add, subject: :add, queue: nil, group: :math, handler: blk }], - @klass.endpoints + assert_equal 1, @klass.endpoints.length + endpoint = @klass.endpoints.first + + assert_equal :add, endpoint.name + assert_equal :add, endpoint.subject + assert_nil endpoint.queue + assert_equal :math, endpoint.group + assert_equal blk, endpoint.handler end it 'adds middleware' do @@ -53,7 +74,7 @@ assert_equal [[String, [1], blk]], @klass.middleware end - it 'dispatches through middleware in reverse order' do + it 'dispatches through middleware in reverse order' do # rubocop:disable Metrics/BlockLength order = [] mw1 = Class.new do def initialize(app) = (@app = app) @@ -71,12 +92,22 @@ def call(wrapper) @app.call(wrapper) end end - handler = ->(w) { w.log << :handler } - wrapper = Struct.new(:raw, :log).new(:raw, order) @klass.use mw1 @klass.use mw2 - @klass.stub(:handle_message, ->(_raw, h) { h.call(wrapper) }) do - @klass.send(:dispatch_with_middleware, wrapper, handler) + + @instance = @klass.new + + raw = Struct.new(:data, :header).new('raw_message', {}) + wrapper = Struct.new(:raw, :log).new(raw, order) + + handler = proc { |wrapper| + wrapper.log << :handler + Dry::Monads::Success(:ok) + } + @instance.stub(:process_result, ->(_wrapper, _result) {}) do + Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do + @instance.send(:dispatch_with_middleware, wrapper, handler) + end end assert_equal %i[mw1 mw2 handler], order @@ -92,9 +123,13 @@ def call(wrapper) result } processed = nil - @klass.stub(:process_result, ->(w, r) { processed = [w, r] }) do + + # Create an instance of the class to test instance methods after middleware is added + @instance = @klass.new + + @instance.stub(:process_result, ->(w, r) { processed = [w, r] }) do Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @klass.send(:handle_message, raw_msg, handler) + @instance.send(:handle_message, raw_msg, handler) end end @@ -107,7 +142,7 @@ def call(wrapper) wrapper = Minitest::Mock.new wrapper.expect(:respond_with_error, nil, ['boom']) Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @klass.send(:handle_message, raw_msg, proc { raise 'boom' }) + @instance.send(:handle_message, raw_msg, proc { raise 'boom' }) end wrapper.verify end @@ -116,7 +151,7 @@ def call(wrapper) wrapper = Minitest::Mock.new wrapper.expect(:respond, nil, ['ok']) result = Rubyists::Leopard::NatsApiServer::Success.new('ok') - @klass.send(:process_result, wrapper, result) + @instance.send(:process_result, wrapper, result) wrapper.verify end @@ -124,7 +159,7 @@ def call(wrapper) wrapper = Minitest::Mock.new wrapper.expect(:respond_with_error, nil, ['fail']) result = Rubyists::Leopard::NatsApiServer::Failure.new('fail') - @klass.send(:process_result, wrapper, result) + @instance.send(:process_result, wrapper, result) wrapper.verify end end From 4339597780281bace0cf1a483cefbcb4447da937 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 10:19:56 -0500 Subject: [PATCH 03/10] feat: Adds Dry::Configurable extension, and makes #logger a configurable setting --- lib/leopard/nats_api_server.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index cf3d28f..db2dc26 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -16,7 +16,8 @@ def self.included(base) base.extend(ClassMethods) base.include(InstanceMethods) base.extend(Dry::Monads[:result]) - base.include(SemanticLogger::Loggable) + base.extend(Dry::Configurable) + base.setting :logger, default: Rubyists::Leopard.logger, reader: true end Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) From 6b2cac02c34df8538c391086e2552fa727a1c095 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 11:16:03 -0500 Subject: [PATCH 04/10] feat: Adds graceful termination of workers --- lib/leopard/nats_api_server.rb | 52 ++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index db2dc26..63c6f6e 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -2,6 +2,7 @@ require 'nats/client' require 'dry/monads' +require 'dry/configurable' require 'concurrent' require_relative '../leopard' require_relative 'message_wrapper' @@ -73,11 +74,11 @@ def use(klass, *args, &block) # @return [void] def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' - # Return the thread pool if non-blocking - pool = spawn_instances(nats_url, service_opts, instances) + workers = Concurrent::Array.new + pool = spawn_instances(nats_url, service_opts, instances, workers) + trap_signals(workers, pool) return pool unless blocking - # Otherwise, just sleep the main thread forever sleep end @@ -90,24 +91,51 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @param count [Integer] The number of instances to spawn. # # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. - def spawn_instances(url, opts, count) + def spawn_instances(url, opts, count, workers) pool = Concurrent::FixedThreadPool.new(count) count.times do eps = endpoints.dup gps = groups.dup - pool.post { setup_worker(url, opts, eps, gps) } + pool.post { build_worker(url, opts, eps, gps, workers) } end pool end - def setup_worker(url, opts, eps, gps) - # Create an instance of the NATS client to act as a worker thread + def build_worker(url, opts, eps, gps, workers) worker = new + workers << worker worker.setup_worker(url, opts, eps, gps) end + + def trap_signals(workers, pool) + shutdown = lambda do + logger.warn 'Draining worker subscriptions...' + workers.each(&:stop) + logger.warn 'All workers stopped, shutting down pool...' + pool.shutdown + logger.warn 'Pool is shut down, waiting for termination!' + pool.wait_for_termination + logger.warn 'Bye bye!' + wake_main_thread + end + %w[INT TERM QUIT].each do |sig| + trap(sig) do + logger.warn "Received #{sig} signal, shutting down..." + Thread.new { shutdown.call } + end + end + end + + def wake_main_thread + Thread.main.wakeup + rescue ThreadError + nil + end end module InstanceMethods + def logger = self.class.config.logger + # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # and keeps the worker thread alive. @@ -119,14 +147,22 @@ module InstanceMethods # # @return [void] def setup_worker(url, opts, eps, gps) + @thread = Thread.current @client = NATS.connect url @service = @client.services.add(**opts) group_map = add_groups(gps) add_endpoints eps, group_map - # Keep the worker thread alive sleep end + def stop + @service&.stop + @client&.close + @thread&.wakeup + rescue ThreadError + nil + end + private # Adds groups to the NATS service. From ba8b6af4fd75e24fadf787f80c16eedf1880efbb Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 11:23:33 -0500 Subject: [PATCH 05/10] style: Corrects rubocop AbcSize offense --- lib/leopard/nats_api_server.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 63c6f6e..f9edbfd 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -107,8 +107,8 @@ def build_worker(url, opts, eps, gps, workers) worker.setup_worker(url, opts, eps, gps) end - def trap_signals(workers, pool) - shutdown = lambda do + def shutdown(workers, pool) + lambda do logger.warn 'Draining worker subscriptions...' workers.each(&:stop) logger.warn 'All workers stopped, shutting down pool...' @@ -118,10 +118,13 @@ def trap_signals(workers, pool) logger.warn 'Bye bye!' wake_main_thread end + end + + def trap_signals(workers, pool) %w[INT TERM QUIT].each do |sig| trap(sig) do logger.warn "Received #{sig} signal, shutting down..." - Thread.new { shutdown.call } + Thread.new { shutdown(workers, pool).call } end end end From 155ed3806b335ae3d9ea551c67825843f5177989 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 11:44:31 -0500 Subject: [PATCH 06/10] style: Separate shutdown method for clarity --- examples/echo_endpoint.rb | 5 ++++- lib/leopard/nats_api_server.rb | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 735fda9..2d600aa 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -13,7 +13,10 @@ class EchoService if __FILE__ == $PROGRAM_NAME EchoService.run( nats_url: 'nats://localhost:4222', - service_opts: { name: 'example.echo', version: '1.0.0' }, + service_opts: { + name: 'example.echo', + version: '1.0.0', + }, instances: 4, ) end diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index f9edbfd..b443a81 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -76,6 +76,7 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' workers = Concurrent::Array.new pool = spawn_instances(nats_url, service_opts, instances, workers) + logger.info 'Setting up signal trap...' trap_signals(workers, pool) return pool unless blocking @@ -102,7 +103,8 @@ def spawn_instances(url, opts, count, workers) end def build_worker(url, opts, eps, gps, workers) - worker = new + instance_args = opts.delete(:instance_args) + worker = instance_args ? new(*instance_args) : new workers << worker worker.setup_worker(url, opts, eps, gps) end @@ -137,8 +139,6 @@ def wake_main_thread end module InstanceMethods - def logger = self.class.config.logger - # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # and keeps the worker thread alive. From a2754308df44786f0df490d875dcdac98898e00b Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 11:48:10 -0500 Subject: [PATCH 07/10] fix: Adds logging of worker options --- lib/leopard/nats_api_server.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index b443a81..e30393f 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -94,6 +94,7 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. def spawn_instances(url, opts, count, workers) pool = Concurrent::FixedThreadPool.new(count) + logger.info "Building #{count} workers with options: #{opts.inspect}" count.times do eps = endpoints.dup gps = groups.dup From bede01de8077024b634ff81bcd371a2860841093 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 12:07:36 -0500 Subject: [PATCH 08/10] fix: Makes #logger a method to keep the One True Logger pattern alive --- examples/echo_endpoint.rb | 5 +++++ lib/leopard/nats_api_server.rb | 38 +++++++++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 2d600aa..202b8ea 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -7,6 +7,10 @@ class EchoService include Rubyists::Leopard::NatsApiServer + def initialize(a_var = 1) + logger.info "EchoService initialized with a_var: #{a_var}" + end + endpoint(:echo) { |msg| Success(msg.data) } end @@ -16,6 +20,7 @@ class EchoService service_opts: { name: 'example.echo', version: '1.0.0', + instance_args: [2], }, instances: 4, ) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index e30393f..e0a02ae 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -94,7 +94,8 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. def spawn_instances(url, opts, count, workers) pool = Concurrent::FixedThreadPool.new(count) - logger.info "Building #{count} workers with options: #{opts.inspect}" + @instance_args = opts.delete(:instance_args) || nil + logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" count.times do eps = endpoints.dup gps = groups.dup @@ -103,13 +104,27 @@ def spawn_instances(url, opts, count, workers) pool end + # Builds a worker instance and sets it up with the NATS server. + # + # @param url [String] The URL of the NATS server. + # @param opts [Hash] Options for the NATS service. + # @param eps [Array] The list of endpoints to add. + # @param gps [Hash] The groups to add. + # @param workers [Array] The array to store worker instances. + # + # @return [void] def build_worker(url, opts, eps, gps, workers) - instance_args = opts.delete(:instance_args) - worker = instance_args ? new(*instance_args) : new + worker = @instance_args ? new(*@instance_args) : new workers << worker worker.setup_worker(url, opts, eps, gps) end + # Shuts down the NATS API server gracefully. + # + # @param workers [Array] The array of worker instances to stop. + # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # + # @return [Proc] A lambda that performs the shutdown operations. def shutdown(workers, pool) lambda do logger.warn 'Draining worker subscriptions...' @@ -123,6 +138,12 @@ def shutdown(workers, pool) end end + # Sets up signal traps for graceful shutdown of the NATS API server. + # + # @param workers [Array] The array of worker instances to stop on signal. + # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # + # @return [void] def trap_signals(workers, pool) %w[INT TERM QUIT].each do |sig| trap(sig) do @@ -132,6 +153,11 @@ def trap_signals(workers, pool) end end + # Wakes up the main thread to allow it to continue execution after the server is stopped. + # This is useful when the server is running in a blocking mode. + # If the main thread is not blocked, this method does nothing. + # + # @return [void] def wake_main_thread Thread.main.wakeup rescue ThreadError @@ -140,6 +166,11 @@ def wake_main_thread end module InstanceMethods + # Returns the logger configured for the NATS API server. + def logger + self.class.logger + end + # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # and keeps the worker thread alive. @@ -159,6 +190,7 @@ def setup_worker(url, opts, eps, gps) sleep end + # Stops the NATS API server worker. def stop @service&.stop @client&.close From b721c47312feabf10373ec1d14189580a9c9b543 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 12:09:58 -0500 Subject: [PATCH 09/10] fix: Adds guard around signal trap defining --- lib/leopard/nats_api_server.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index e0a02ae..08fbd68 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -145,12 +145,15 @@ def shutdown(workers, pool) # # @return [void] def trap_signals(workers, pool) + return if @trapped + %w[INT TERM QUIT].each do |sig| trap(sig) do logger.warn "Received #{sig} signal, shutting down..." Thread.new { shutdown(workers, pool).call } end end + @trapped = true end # Wakes up the main thread to allow it to continue execution after the server is stopped. From 41cb0d14a14fd77f75f8a26a2bd855da1f5c54e3 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Sat, 2 Aug 2025 15:35:47 -0500 Subject: [PATCH 10/10] style: Cleaner syntax for the instance logger definition --- lib/leopard/nats_api_server.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 08fbd68..f8a54d6 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -170,9 +170,7 @@ def wake_main_thread module InstanceMethods # Returns the logger configured for the NATS API server. - def logger - self.class.logger - end + def logger = self.class.logger # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints,