From 5d4dfb93d7711c8640b850a575a00cdcadf52fb8 Mon Sep 17 00:00:00 2001 From: Marek Wysinski Date: Mon, 21 Nov 2022 10:15:02 +0000 Subject: [PATCH] Accept except-queues parameter --- lib/delayed/command.rb | 3 +++ lib/delayed/tasks.rb | 1 + lib/delayed/worker.rb | 8 +++++--- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 281078242..f09a595a1 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -71,6 +71,9 @@ def initialize(args) # rubocop:disable MethodLength opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| @options[:queues] = queue.split(',') end + opt.on('--except-queues=queues', 'Specify queues to ignore for this worker or a worker pool') do |queues| + @options[:except_queues] = queues.split(',') + end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| parse_worker_pool(pool) end diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index 409ba48f8..190c7503c 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -19,6 +19,7 @@ :min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'], :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), + :except_queues => (ENV['EXCEPT_QUEUES'] || ENV['EXCEPT_QUEUE'] || '').split(','), :quiet => ENV['QUIET'] } diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 7b983a206..041e735d8 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -17,13 +17,14 @@ class Worker # rubocop:disable ClassLength DEFAULT_DEFAULT_PRIORITY = 0 DEFAULT_DELAY_JOBS = true DEFAULT_QUEUES = [].freeze + DEFAULT_EXCEPT_QUEUES = [].freeze DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze DEFAULT_READ_AHEAD = 5 cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, - :read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete, - :default_log_level + :except_queues, :read_ahead, :plugins, :destroy_failed_jobs, + :exit_on_complete, :default_log_level # Named queue into which jobs are enqueued by default cattr_accessor :default_queue_name @@ -41,6 +42,7 @@ def self.reset self.default_priority = DEFAULT_DEFAULT_PRIORITY self.delay_jobs = DEFAULT_DELAY_JOBS self.queues = DEFAULT_QUEUES + self.except_queues = DEFAULT_EXCEPT_QUEUES self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES self.read_ahead = DEFAULT_READ_AHEAD @lifecycle = nil @@ -132,7 +134,7 @@ def initialize(options = {}) @quiet = options.key?(:quiet) ? options[:quiet] : true @failed_reserve_count = 0 - [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option| + [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :except_queues, :exit_on_complete].each do |option| self.class.send("#{option}=", options[option]) if options.key?(option) end