lib/good_job/notifier.rb in good_job-3.28.2 vs lib/good_job/notifier.rb in good_job-3.29.3

- old
+ new

@@ -1,9 +1,10 @@ # frozen_string_literal: true require 'active_support/core_ext/module/attribute_accessors_per_thread' require 'concurrent/atomic/atomic_boolean' +require "concurrent/scheduled_task" require "good_job/notifier/process_heartbeat" module GoodJob # :nodoc: # # Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes. @@ -60,25 +61,25 @@ # @return [Array<#call, Array(Object, Symbol)>] attr_reader :recipients # @param recipients [Array<#call, Array(Object, Symbol)>] # @param enable_listening [true, false] - # @param executor [Concurrent::ExecutorService] - def initialize(*recipients, enable_listening: true, executor: Concurrent.global_io_executor) + def initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) @recipients = Concurrent::Array.new(recipients) @enable_listening = enable_listening @executor = executor - @mutex = Mutex.new + @monitor = Monitor.new @shutdown_event = Concurrent::Event.new.tap(&:set) @running = Concurrent::AtomicBoolean.new(false) @connected = Concurrent::Event.new @listening = Concurrent::Event.new @connection_errors_count = Concurrent::AtomicFixnum.new(0) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening @task = nil + @capsule = capsule start self.class.instances << self end @@ -181,10 +182,12 @@ end end private + delegate :synchronize, to: :@monitor + def start synchronize do return if @running.true? @running.make_true @@ -209,24 +212,24 @@ end end end while thr_executor.running? && thr_running.true? - run_callbacks :tick do - wait_for_notify do |channel, payload| - next unless channel == CHANNEL + Rails.application.executor.wrap { run_callbacks(:tick) } - ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) - parsed_payload = JSON.parse(payload, symbolize_names: true) - thr_recipients.each do |recipient| - target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] - target.send(method_name, parsed_payload) - end - end + wait_for_notify do |channel, payload| + next unless channel == CHANNEL - reset_connection_errors + ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) + parsed_payload = JSON.parse(payload, symbolize_names: true) + thr_recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name, parsed_payload) + end end + + reset_connection_errors end end ensure Rails.application.executor.wrap do run_callbacks :unlisten do @@ -281,16 +284,8 @@ end def reset_connection_errors @connection_errors_count.value = 0 @connection_errors_reported.make_false - end - - def synchronize(&block) - if @mutex.owned? - yield - else - @mutex.synchronize(&block) - end end end end