diff --git a/Gemfile.lock b/Gemfile.lock index bd17c90..fee305e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - leopard (0.1.0) + leopard (0.1.5) concurrent-ruby (~> 1.1) dry-configurable (~> 1.3) dry-monads (~> 1.9) diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 735fda9..202b8ea 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -7,13 +7,21 @@ class EchoService include Rubyists::Leopard::NatsApiServer + def initialize(a_var = 1) + logger.info "EchoService initialized with a_var: #{a_var}" + end + endpoint(:echo) { |msg| Success(msg.data) } end if __FILE__ == $PROGRAM_NAME EchoService.run( nats_url: 'nats://localhost:4222', - service_opts: { name: 'example.echo', version: '1.0.0' }, + service_opts: { + name: 'example.echo', + version: '1.0.0', + instance_args: [2], + }, instances: 4, ) end diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 0d74088..f8a54d6 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -2,6 +2,7 @@ require 'nats/client' require 'dry/monads' +require 'dry/configurable' require 'concurrent' require_relative '../leopard' require_relative 'message_wrapper' @@ -14,10 +15,14 @@ module NatsApiServer def self.included(base) base.extend(ClassMethods) + base.include(InstanceMethods) base.extend(Dry::Monads[:result]) - base.include(SemanticLogger::Loggable) + base.extend(Dry::Configurable) + base.setting :logger, default: Rubyists::Leopard.logger, reader: true end + Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) + module ClassMethods def endpoints = @endpoints ||= [] def groups = @groups ||= {} @@ -33,13 +38,7 @@ def middleware = @middleware ||= [] # # @return [void] def endpoint(name, subject: nil, queue: nil, group: nil, &handler) - endpoints << { - name:, - subject: subject || name, - queue:, - group:, - handler:, - } + endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:) end # Define a group for organizing endpoints. @@ -75,11 +74,12 @@ def use(klass, *args, &block) # @return [void] def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' - # Return the thread pool if non-blocking - pool = spawn_instances(nats_url, service_opts, instances) + workers = Concurrent::Array.new + pool = spawn_instances(nats_url, service_opts, instances, workers) + logger.info 'Setting up signal trap...' + trap_signals(workers, pool) return pool unless blocking - # Otherwise, just sleep the main thread forever sleep end @@ -92,16 +92,86 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @param count [Integer] The number of instances to spawn. # # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. - def spawn_instances(url, opts, count) + def spawn_instances(url, opts, count, workers) pool = Concurrent::FixedThreadPool.new(count) + @instance_args = opts.delete(:instance_args) || nil + logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" count.times do eps = endpoints.dup gps = groups.dup - pool.post { setup_worker(url, opts, eps, gps) } + pool.post { build_worker(url, opts, eps, gps, workers) } end pool end + # Builds a worker instance and sets it up with the NATS server. + # + # @param url [String] The URL of the NATS server. + # @param opts [Hash] Options for the NATS service. + # @param eps [Array] The list of endpoints to add. + # @param gps [Hash] The groups to add. + # @param workers [Array] The array to store worker instances. + # + # @return [void] + def build_worker(url, opts, eps, gps, workers) + worker = @instance_args ? new(*@instance_args) : new + workers << worker + worker.setup_worker(url, opts, eps, gps) + end + + # Shuts down the NATS API server gracefully. + # + # @param workers [Array] The array of worker instances to stop. + # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # + # @return [Proc] A lambda that performs the shutdown operations. + def shutdown(workers, pool) + lambda do + logger.warn 'Draining worker subscriptions...' + workers.each(&:stop) + logger.warn 'All workers stopped, shutting down pool...' + pool.shutdown + logger.warn 'Pool is shut down, waiting for termination!' + pool.wait_for_termination + logger.warn 'Bye bye!' + wake_main_thread + end + end + + # Sets up signal traps for graceful shutdown of the NATS API server. + # + # @param workers [Array] The array of worker instances to stop on signal. + # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # + # @return [void] + def trap_signals(workers, pool) + return if @trapped + + %w[INT TERM QUIT].each do |sig| + trap(sig) do + logger.warn "Received #{sig} signal, shutting down..." + Thread.new { shutdown(workers, pool).call } + end + end + @trapped = true + end + + # Wakes up the main thread to allow it to continue execution after the server is stopped. + # This is useful when the server is running in a blocking mode. + # If the main thread is not blocked, this method does nothing. + # + # @return [void] + def wake_main_thread + Thread.main.wakeup + rescue ThreadError + nil + end + end + + module InstanceMethods + # Returns the logger configured for the NATS API server. + def logger = self.class.logger + # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # and keeps the worker thread alive. @@ -113,62 +183,80 @@ def spawn_instances(url, opts, count) # # @return [void] def setup_worker(url, opts, eps, gps) - client = NATS.connect url - service = client.services.add(**opts) - group_map = add_groups(service, gps) - add_endpoints service, eps, group_map - # Keep the worker thread alive + @thread = Thread.current + @client = NATS.connect url + @service = @client.services.add(**opts) + group_map = add_groups(gps) + add_endpoints eps, group_map sleep end + # Stops the NATS API server worker. + def stop + @service&.stop + @client&.close + @thread&.wakeup + rescue ThreadError + nil + end + + private + # Adds groups to the NATS service. # - # @param service [NATS::Service] The NATS service to add groups to. # @param gps [Hash] The groups to add, where keys are group names and values are group definitions. # # @return [Hash] A map of group names to their created group objects. - def add_groups(service, gps) + def add_groups(gps) created = {} - gps.each_key { |name| build_group(service, gps, created, name) } + gps.each_key { |name| build_group(gps, created, name) } created end # Builds a group in the NATS service. # - # @param service [NATS::Service] The NATS service to add the group to. # @param defs [Hash] The group definitions, where keys are group names and values are group definitions. # @param cache [Hash] A cache to store already created groups. # @param name [String] The name of the group to build. # # @return [NATS::Group] The created group object. - def build_group(service, defs, cache, name) + def build_group(defs, cache, name) return cache[name] if cache.key?(name) gdef = defs[name] raise ArgumentError, "Group #{name} not defined" unless gdef - parent = gdef[:parent] ? build_group(service, defs, cache, gdef[:parent]) : service + parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue]) end # Adds endpoints to the NATS service. # - # @param service [NATS::Service] The NATS service to add endpoints to. # @param endpoints [Array] The list of endpoints to add. # @param group_map [Hash] A map of group names to their created group objects. # # @return [void] - def add_endpoints(service, endpoints, group_map) + def add_endpoints(endpoints, group_map) endpoints.each do |ep| - parent = ep[:group] ? group_map[ep[:group]] : service - raise ArgumentError, "Group #{ep[:group]} not defined" if ep[:group] && parent.nil? - - parent.endpoints.add( - ep[:name], subject: ep[:subject], queue: ep[:queue] - ) do |raw_msg| - wrapper = MessageWrapper.new(raw_msg) - dispatch_with_middleware(wrapper, ep[:handler]) - end + grp = ep.group + parent = grp ? group_map[grp] : @service + raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil? + + build_endpoint(parent, ep) + end + end + + # Builds an endpoint in the NATS service. + # + # @param parent [NATS::Group] The parent group or service to add the endpoint to. + # @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler. + # NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion. + # + # @return [void] + def build_endpoint(parent, ept) + parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| + wrapper = MessageWrapper.new(raw_msg) + dispatch_with_middleware(wrapper, ept.handler) end end @@ -180,7 +268,7 @@ def add_endpoints(service, endpoints, group_map) # @return [void] def dispatch_with_middleware(wrapper, handler) app = ->(w) { handle_message(w.raw, handler) } - middleware.reverse_each do |(klass, args, blk)| + self.class.middleware.reverse_each do |(klass, args, blk)| app = klass.new(app, *args, &blk) end app.call(wrapper) @@ -203,7 +291,6 @@ def handle_message(raw_msg, handler) # Processes the result of the handler execution. # - # # @param wrapper [MessageWrapper] The message wrapper containing the raw message. # @param result [Dry::Monads::Result] The result of the handler execution. # diff --git a/test/lib/nats_api_server.rb b/test/lib/nats_api_server.rb index 21ef6ca..1fc732b 100755 --- a/test/lib/nats_api_server.rb +++ b/test/lib/nats_api_server.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'helper' +require_relative '../helper' require Rubyists::Leopard.libroot / 'leopard/nats_api_server' describe 'Rubyists::Leopard::NatsApiServer' do # rubocop:disable Metrics/BlockLength @@ -9,6 +9,9 @@ include Rubyists::Leopard::NatsApiServer end + # Create an instance of the class to test instance methods + @instance = @klass.new + mod = Rubyists::Leopard::NatsApiServer cm = mod::ClassMethods cm.const_set(:Success, mod::Success) unless cm.const_defined?(:Success) @@ -19,16 +22,28 @@ blk = proc {} @klass.endpoint(:foo, &blk) - assert_equal [{ name: :foo, subject: :foo, queue: nil, group: nil, handler: blk }], - @klass.endpoints + assert_equal 1, @klass.endpoints.length + endpoint = @klass.endpoints.first + + assert_equal :foo, endpoint.name + assert_equal :foo, endpoint.subject + assert_nil endpoint.queue + assert_nil endpoint.group + assert_equal blk, endpoint.handler end it 'registers an endpoint with options' do blk = proc {} @klass.endpoint(:foo, subject: 'bar', queue: 'q', &blk) - assert_equal [{ name: :foo, subject: 'bar', queue: 'q', group: nil, handler: blk }], - @klass.endpoints + assert_equal 1, @klass.endpoints.length + endpoint = @klass.endpoints.first + + assert_equal :foo, endpoint.name + assert_equal 'bar', endpoint.subject + assert_equal 'q', endpoint.queue + assert_nil endpoint.group + assert_equal blk, endpoint.handler end it 'registers a group' do @@ -42,8 +57,14 @@ @klass.group :math @klass.endpoint(:add, group: :math, &blk) - assert_equal [{ name: :add, subject: :add, queue: nil, group: :math, handler: blk }], - @klass.endpoints + assert_equal 1, @klass.endpoints.length + endpoint = @klass.endpoints.first + + assert_equal :add, endpoint.name + assert_equal :add, endpoint.subject + assert_nil endpoint.queue + assert_equal :math, endpoint.group + assert_equal blk, endpoint.handler end it 'adds middleware' do @@ -53,7 +74,7 @@ assert_equal [[String, [1], blk]], @klass.middleware end - it 'dispatches through middleware in reverse order' do + it 'dispatches through middleware in reverse order' do # rubocop:disable Metrics/BlockLength order = [] mw1 = Class.new do def initialize(app) = (@app = app) @@ -71,12 +92,22 @@ def call(wrapper) @app.call(wrapper) end end - handler = ->(w) { w.log << :handler } - wrapper = Struct.new(:raw, :log).new(:raw, order) @klass.use mw1 @klass.use mw2 - @klass.stub(:handle_message, ->(_raw, h) { h.call(wrapper) }) do - @klass.send(:dispatch_with_middleware, wrapper, handler) + + @instance = @klass.new + + raw = Struct.new(:data, :header).new('raw_message', {}) + wrapper = Struct.new(:raw, :log).new(raw, order) + + handler = proc { |wrapper| + wrapper.log << :handler + Dry::Monads::Success(:ok) + } + @instance.stub(:process_result, ->(_wrapper, _result) {}) do + Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do + @instance.send(:dispatch_with_middleware, wrapper, handler) + end end assert_equal %i[mw1 mw2 handler], order @@ -92,9 +123,13 @@ def call(wrapper) result } processed = nil - @klass.stub(:process_result, ->(w, r) { processed = [w, r] }) do + + # Create an instance of the class to test instance methods after middleware is added + @instance = @klass.new + + @instance.stub(:process_result, ->(w, r) { processed = [w, r] }) do Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @klass.send(:handle_message, raw_msg, handler) + @instance.send(:handle_message, raw_msg, handler) end end @@ -107,7 +142,7 @@ def call(wrapper) wrapper = Minitest::Mock.new wrapper.expect(:respond_with_error, nil, ['boom']) Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @klass.send(:handle_message, raw_msg, proc { raise 'boom' }) + @instance.send(:handle_message, raw_msg, proc { raise 'boom' }) end wrapper.verify end @@ -116,7 +151,7 @@ def call(wrapper) wrapper = Minitest::Mock.new wrapper.expect(:respond, nil, ['ok']) result = Rubyists::Leopard::NatsApiServer::Success.new('ok') - @klass.send(:process_result, wrapper, result) + @instance.send(:process_result, wrapper, result) wrapper.verify end @@ -124,7 +159,7 @@ def call(wrapper) wrapper = Minitest::Mock.new wrapper.expect(:respond_with_error, nil, ['fail']) result = Rubyists::Leopard::NatsApiServer::Failure.new('fail') - @klass.send(:process_result, wrapper, result) + @instance.send(:process_result, wrapper, result) wrapper.verify end end