lib/good_job/adapter.rb in good_job-3.10.1 vs lib/good_job/adapter.rb in good_job-3.11.0
- old
+ new
@@ -88,18 +88,24 @@
end
ensure
inline_executions.each(&:advisory_unlock)
end
- executions.reject(&:finished_at).group_by(&:queue_name).each do |queue_name, executions_by_queue|
- executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
- # TODO: have Adapter#create_thread handle state[:count] values
- state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
- state[:scheduled_at] = scheduled_at if scheduled_at
+ non_inline_executions = executions.reject(&:finished_at)
+ if non_inline_executions.any?
+ job_id_to_active_jobs = active_jobs.index_by(&:job_id)
+ non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
+ executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
+ state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
+ state[:scheduled_at] = scheduled_at if scheduled_at
- executed_locally = execute_async? && @scheduler&.create_thread(state)
- Notifier.notify(state) unless executed_locally
+ executed_locally = execute_async? && @scheduler&.create_thread(state)
+ unless executed_locally
+ state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
+ Notifier.notify(state) unless state[:count].zero?
+ end
+ end
end
end
active_jobs.count(&:provider_job_id)
end
@@ -134,11 +140,11 @@
else
job_state = { queue_name: execution.queue_name }
job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at
executed_locally = execute_async? && @scheduler&.create_thread(job_state)
- Notifier.notify(job_state) unless executed_locally
+ Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
end
execution
end
@@ -191,11 +197,11 @@
# Start async executors
# @return [void]
def start_async
return unless execute_async?
- @notifier = GoodJob::Notifier.new
+ @notifier = GoodJob::Notifier.new(enable_listening: GoodJob.configuration.enable_listen_notify)
@poller = GoodJob::Poller.new(poll_interval: GoodJob.configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(GoodJob.configuration, warm_cache_on_initialize: true)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]
@@ -223,8 +229,15 @@
self_caller.grep(%r{config.ru}).any? || # EXAMPLE: config.ru:3:in `block in <main>' OR config.ru:3:in `new_from_string'
self_caller.grep(%r{puma/request}).any? || # EXAMPLE: puma-5.6.4/lib/puma/request.rb:76:in `handle_request'
self_caller.grep(%{/rack/handler/}).any? || # EXAMPLE: iodine-0.7.44/lib/rack/handler/iodine.rb:13:in `start'
(Concurrent.on_jruby? && self_caller.grep(%r{jruby/rack/rails_booter}).any?) # EXAMPLE: uri:classloader:/jruby/rack/rails_booter.rb:83:in `load_environment'
end
+ end
+
+ def send_notify?(active_job)
+ return true unless active_job.respond_to?(:good_job_notify)
+ return false unless GoodJob.configuration.enable_listen_notify
+
+ !(active_job.good_job_notify == false || (active_job.class.good_job_notify == false && active_job.good_job_notify.nil?))
end
end
end