lib/good_job/notifier.rb in good_job-3.15.13 vs lib/good_job/notifier.rb in good_job-3.15.14
- old
+ new
@@ -1,9 +1,9 @@
# frozen_string_literal: true
require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent/atomic/atomic_boolean'
-require "good_job/notifier/process_registration"
+require "good_job/notifier/process_heartbeat"
module GoodJob # :nodoc:
#
# Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.
#
@@ -11,13 +11,13 @@
# A notifier will LISTEN for messages by creating a background thread that runs in an instance of +Concurrent::ThreadPoolExecutor+.
# When a message is received, the notifier passes the message to each of its recipients.
#
class Notifier
include ActiveSupport::Callbacks
- define_callbacks :listen, :unlisten
+ define_callbacks :listen, :tick, :unlisten
- include Notifier::ProcessRegistration
+ include ProcessHeartbeat
# Default Postgres channel for LISTEN/NOTIFY
CHANNEL = 'good_job'
# Defaults for instance of Concurrent::ThreadPoolExecutor
EXECUTOR_OPTIONS = {
@@ -187,21 +187,23 @@
end
end
end
while thr_executor.running?
- wait_for_notify do |channel, payload|
- next unless channel == CHANNEL
+ run_callbacks :tick do
+ wait_for_notify do |channel, payload|
+ next unless channel == CHANNEL
- ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
- parsed_payload = JSON.parse(payload, symbolize_names: true)
- thr_recipients.each do |recipient|
- target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
- target.send(method_name, parsed_payload)
+ ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
+ parsed_payload = JSON.parse(payload, symbolize_names: true)
+ thr_recipients.each do |recipient|
+ target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
+ target.send(method_name, parsed_payload)
+ end
end
- end
- reset_connection_errors
+ reset_connection_errors
+ end
end
end
ensure
Rails.application.executor.wrap do
run_callbacks :unlisten do