Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
leopard (0.1.0)
leopard (0.1.5)
concurrent-ruby (~> 1.1)
dry-configurable (~> 1.3)
dry-monads (~> 1.9)
Expand Down
10 changes: 9 additions & 1 deletion examples/echo_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@
class EchoService
include Rubyists::Leopard::NatsApiServer

def initialize(a_var = 1)
logger.info "EchoService initialized with a_var: #{a_var}"
end

endpoint(:echo) { |msg| Success(msg.data) }
end

if __FILE__ == $PROGRAM_NAME
EchoService.run(
nats_url: 'nats://localhost:4222',
service_opts: { name: 'example.echo', version: '1.0.0' },
service_opts: {
name: 'example.echo',
version: '1.0.0',
instance_args: [2],
},
instances: 4,
)
end
161 changes: 124 additions & 37 deletions lib/leopard/nats_api_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'nats/client'
require 'dry/monads'
require 'dry/configurable'
require 'concurrent'
require_relative '../leopard'
require_relative 'message_wrapper'
Expand All @@ -14,10 +15,14 @@ module NatsApiServer

def self.included(base)
base.extend(ClassMethods)
base.include(InstanceMethods)
base.extend(Dry::Monads[:result])
base.include(SemanticLogger::Loggable)
base.extend(Dry::Configurable)
base.setting :logger, default: Rubyists::Leopard.logger, reader: true
end

Endpoint = Struct.new(:name, :subject, :queue, :group, :handler)

module ClassMethods
def endpoints = @endpoints ||= []
def groups = @groups ||= {}
Expand All @@ -33,13 +38,7 @@ def middleware = @middleware ||= []
#
# @return [void]
def endpoint(name, subject: nil, queue: nil, group: nil, &handler)
endpoints << {
name:,
subject: subject || name,
queue:,
group:,
handler:,
}
endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:)
end

# Define a group for organizing endpoints.
Expand Down Expand Up @@ -75,11 +74,12 @@ def use(klass, *args, &block)
# @return [void]
def run(nats_url:, service_opts:, instances: 1, blocking: true)
logger.info 'Booting NATS API server...'
# Return the thread pool if non-blocking
pool = spawn_instances(nats_url, service_opts, instances)
workers = Concurrent::Array.new
pool = spawn_instances(nats_url, service_opts, instances, workers)
logger.info 'Setting up signal trap...'
trap_signals(workers, pool)
return pool unless blocking

# Otherwise, just sleep the main thread forever
sleep
end

Expand All @@ -92,16 +92,86 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true)
# @param count [Integer] The number of instances to spawn.
#
# @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
def spawn_instances(url, opts, count)
def spawn_instances(url, opts, count, workers)
pool = Concurrent::FixedThreadPool.new(count)
@instance_args = opts.delete(:instance_args) || nil
logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}"
count.times do
eps = endpoints.dup
gps = groups.dup
pool.post { setup_worker(url, opts, eps, gps) }
pool.post { build_worker(url, opts, eps, gps, workers) }
end
pool
end

# Builds a worker instance and sets it up with the NATS server.
#
# @param url [String] The URL of the NATS server.
# @param opts [Hash] Options for the NATS service.
# @param eps [Array<Hash>] The list of endpoints to add.
# @param gps [Hash] The groups to add.
# @param workers [Array] The array to store worker instances.
#
# @return [void]
def build_worker(url, opts, eps, gps, workers)
worker = @instance_args ? new(*@instance_args) : new
workers << worker
worker.setup_worker(url, opts, eps, gps)
end

# Shuts down the NATS API server gracefully.
#
# @param workers [Array] The array of worker instances to stop.
# @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
#
# @return [Proc] A lambda that performs the shutdown operations.
def shutdown(workers, pool)
lambda do
logger.warn 'Draining worker subscriptions...'
workers.each(&:stop)
logger.warn 'All workers stopped, shutting down pool...'
pool.shutdown
logger.warn 'Pool is shut down, waiting for termination!'
pool.wait_for_termination
logger.warn 'Bye bye!'
wake_main_thread
end
end

# Sets up signal traps for graceful shutdown of the NATS API server.
#
# @param workers [Array] The array of worker instances to stop on signal.
# @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
#
# @return [void]
def trap_signals(workers, pool)
return if @trapped

%w[INT TERM QUIT].each do |sig|
trap(sig) do
logger.warn "Received #{sig} signal, shutting down..."
Thread.new { shutdown(workers, pool).call }
end
end
@trapped = true
end

# Wakes up the main thread to allow it to continue execution after the server is stopped.
# This is useful when the server is running in a blocking mode.
# If the main thread is not blocked, this method does nothing.
#
# @return [void]
def wake_main_thread
Thread.main.wakeup
rescue ThreadError
nil
end
end

module InstanceMethods
# Returns the logger configured for the NATS API server.
def logger = self.class.logger

# Sets up a worker thread for the NATS API server.
# This method connects to the NATS server, adds the service, groups, and endpoints,
# and keeps the worker thread alive.
Expand All @@ -113,62 +183,80 @@ def spawn_instances(url, opts, count)
#
# @return [void]
def setup_worker(url, opts, eps, gps)
client = NATS.connect url
service = client.services.add(**opts)
group_map = add_groups(service, gps)
add_endpoints service, eps, group_map
# Keep the worker thread alive
@thread = Thread.current
@client = NATS.connect url
@service = @client.services.add(**opts)
group_map = add_groups(gps)
add_endpoints eps, group_map
sleep
end

# Stops the NATS API server worker.
def stop
@service&.stop
@client&.close
@thread&.wakeup
rescue ThreadError
nil
end

private

# Adds groups to the NATS service.
#
# @param service [NATS::Service] The NATS service to add groups to.
# @param gps [Hash] The groups to add, where keys are group names and values are group definitions.
#
# @return [Hash] A map of group names to their created group objects.
def add_groups(service, gps)
def add_groups(gps)
created = {}
gps.each_key { |name| build_group(service, gps, created, name) }
gps.each_key { |name| build_group(gps, created, name) }
created
end

# Builds a group in the NATS service.
#
# @param service [NATS::Service] The NATS service to add the group to.
# @param defs [Hash] The group definitions, where keys are group names and values are group definitions.
# @param cache [Hash] A cache to store already created groups.
# @param name [String] The name of the group to build.
#
# @return [NATS::Group] The created group object.
def build_group(service, defs, cache, name)
def build_group(defs, cache, name)
return cache[name] if cache.key?(name)

gdef = defs[name]
raise ArgumentError, "Group #{name} not defined" unless gdef

parent = gdef[:parent] ? build_group(service, defs, cache, gdef[:parent]) : service
parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service
cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue])
end

# Adds endpoints to the NATS service.
#
# @param service [NATS::Service] The NATS service to add endpoints to.
# @param endpoints [Array<Hash>] The list of endpoints to add.
# @param group_map [Hash] A map of group names to their created group objects.
#
# @return [void]
def add_endpoints(service, endpoints, group_map)
def add_endpoints(endpoints, group_map)
endpoints.each do |ep|
parent = ep[:group] ? group_map[ep[:group]] : service
raise ArgumentError, "Group #{ep[:group]} not defined" if ep[:group] && parent.nil?

parent.endpoints.add(
ep[:name], subject: ep[:subject], queue: ep[:queue]
) do |raw_msg|
wrapper = MessageWrapper.new(raw_msg)
dispatch_with_middleware(wrapper, ep[:handler])
end
grp = ep.group
parent = grp ? group_map[grp] : @service
raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil?

build_endpoint(parent, ep)
end
end

# Builds an endpoint in the NATS service.
#
# @param parent [NATS::Group] The parent group or service to add the endpoint to.
# @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler.
# NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion.
#
# @return [void]
def build_endpoint(parent, ept)
parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg|
wrapper = MessageWrapper.new(raw_msg)
dispatch_with_middleware(wrapper, ept.handler)
end
end

Expand All @@ -180,7 +268,7 @@ def add_endpoints(service, endpoints, group_map)
# @return [void]
def dispatch_with_middleware(wrapper, handler)
app = ->(w) { handle_message(w.raw, handler) }
middleware.reverse_each do |(klass, args, blk)|
self.class.middleware.reverse_each do |(klass, args, blk)|
app = klass.new(app, *args, &blk)
end
app.call(wrapper)
Expand All @@ -203,7 +291,6 @@ def handle_message(raw_msg, handler)

# Processes the result of the handler execution.
#
#
# @param wrapper [MessageWrapper] The message wrapper containing the raw message.
# @param result [Dry::Monads::Result] The result of the handler execution.
#
Expand Down
Loading