lib/good_job/notifier.rb in good_job-3.28.2 vs lib/good_job/notifier.rb in good_job-3.29.3
- old
+ new
@@ -1,9 +1,10 @@
# frozen_string_literal: true
require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent/atomic/atomic_boolean'
+require "concurrent/scheduled_task"
require "good_job/notifier/process_heartbeat"
module GoodJob # :nodoc:
#
# Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.
@@ -60,25 +61,25 @@
# @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients
# @param recipients [Array<#call, Array(Object, Symbol)>]
# @param enable_listening [true, false]
- # @param executor [Concurrent::ExecutorService]
- def initialize(*recipients, enable_listening: true, executor: Concurrent.global_io_executor)
+ def initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor)
@recipients = Concurrent::Array.new(recipients)
@enable_listening = enable_listening
@executor = executor
- @mutex = Mutex.new
+ @monitor = Monitor.new
@shutdown_event = Concurrent::Event.new.tap(&:set)
@running = Concurrent::AtomicBoolean.new(false)
@connected = Concurrent::Event.new
@listening = Concurrent::Event.new
@connection_errors_count = Concurrent::AtomicFixnum.new(0)
@connection_errors_reported = Concurrent::AtomicBoolean.new(false)
@enable_listening = enable_listening
@task = nil
+ @capsule = capsule
start
self.class.instances << self
end
@@ -181,10 +182,12 @@
end
end
private
+ delegate :synchronize, to: :@monitor
+
def start
synchronize do
return if @running.true?
@running.make_true
@@ -209,24 +212,24 @@
end
end
end
while thr_executor.running? && thr_running.true?
- run_callbacks :tick do
- wait_for_notify do |channel, payload|
- next unless channel == CHANNEL
+ Rails.application.executor.wrap { run_callbacks(:tick) }
- ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
- parsed_payload = JSON.parse(payload, symbolize_names: true)
- thr_recipients.each do |recipient|
- target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
- target.send(method_name, parsed_payload)
- end
- end
+ wait_for_notify do |channel, payload|
+ next unless channel == CHANNEL
- reset_connection_errors
+ ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
+ parsed_payload = JSON.parse(payload, symbolize_names: true)
+ thr_recipients.each do |recipient|
+ target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
+ target.send(method_name, parsed_payload)
+ end
end
+
+ reset_connection_errors
end
end
ensure
Rails.application.executor.wrap do
run_callbacks :unlisten do
@@ -281,16 +284,8 @@
end
def reset_connection_errors
@connection_errors_count.value = 0
@connection_errors_reported.make_false
- end
-
- def synchronize(&block)
- if @mutex.owned?
- yield
- else
- @mutex.synchronize(&block)
- end
end
end
end