lib/good_job/notifier.rb in good_job-3.15.13 vs lib/good_job/notifier.rb in good_job-3.15.14

- old
+ new

@@ -1,9 +1,9 @@ # frozen_string_literal: true require 'active_support/core_ext/module/attribute_accessors_per_thread' require 'concurrent/atomic/atomic_boolean' -require "good_job/notifier/process_registration" +require "good_job/notifier/process_heartbeat" module GoodJob # :nodoc: # # Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes. # @@ -11,13 +11,13 @@ # A notifier will LISTEN for messages by creating a background thread that runs in an instance of +Concurrent::ThreadPoolExecutor+. # When a message is received, the notifier passes the message to each of its recipients. # class Notifier include ActiveSupport::Callbacks - define_callbacks :listen, :unlisten + define_callbacks :listen, :tick, :unlisten - include Notifier::ProcessRegistration + include ProcessHeartbeat # Default Postgres channel for LISTEN/NOTIFY CHANNEL = 'good_job' # Defaults for instance of Concurrent::ThreadPoolExecutor EXECUTOR_OPTIONS = { @@ -187,21 +187,23 @@ end end end while thr_executor.running? - wait_for_notify do |channel, payload| - next unless channel == CHANNEL + run_callbacks :tick do + wait_for_notify do |channel, payload| + next unless channel == CHANNEL - ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) - parsed_payload = JSON.parse(payload, symbolize_names: true) - thr_recipients.each do |recipient| - target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] - target.send(method_name, parsed_payload) + ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) + parsed_payload = JSON.parse(payload, symbolize_names: true) + thr_recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name, parsed_payload) + end end - end - reset_connection_errors + reset_connection_errors + end end end ensure Rails.application.executor.wrap do run_callbacks :unlisten do