lib/good_job/notifier.rb in good_job-2.7.4 vs lib/good_job/notifier.rb in good_job-2.8.0
- old
+ new
@@ -1,6 +1,7 @@
# frozen_string_literal: true
+require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent/atomic/atomic_boolean'
module GoodJob # :nodoc:
#
# Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.
@@ -8,10 +9,15 @@
# Notifiers can emit NOTIFY messages through Postgres.
# A notifier will LISTEN for messages by creating a background thread that runs in an instance of +Concurrent::ThreadPoolExecutor+.
# When a message is received, the notifier passes the message to each of its recipients.
#
class Notifier
+ include ActiveSupport::Callbacks
+ define_callbacks :listen, :unlisten
+
+ include Notifier::ProcessRegistration
+
# Raised if the Database adapter does not implement LISTEN.
AdapterCannotListenError = Class.new(StandardError)
# Default Postgres channel for LISTEN/NOTIFY
CHANNEL = 'good_job'
@@ -41,10 +47,16 @@
# @!scope class
# List of all instantiated Notifiers in the current process.
# @return [Array<GoodJob::Notifier>, nil]
cattr_reader :instances, default: [], instance_reader: false
+ # @!attribute [rw] connection
+ # @!scope class
+ # ActiveRecord Connection that has been established for the Notifier.
+ # @return [ActiveRecord::ConnectionAdapters::AbstractAdapter, nil]
+ thread_cattr_accessor :connection
+
# Send a message via Postgres NOTIFY
# @param message [#to_json]
def self.notify(message)
connection = Execution.connection
connection.exec_query <<~SQL.squish
@@ -144,51 +156,67 @@
@executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
end
def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
- with_listen_connection do |conn|
- ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
- conn.async_exec("LISTEN #{CHANNEL}").clear
- end
+ with_connection do
+ begin
+ run_callbacks :listen do
+ ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
+ connection.execute("LISTEN #{CHANNEL}")
+ end
+ thr_listening.make_true
+ end
- ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
- thr_listening.make_true
- while thr_executor.running?
- conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
- next unless channel == CHANNEL
+ ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
+ while thr_executor.running?
+ wait_for_notify do |channel, payload|
+ next unless channel == CHANNEL
- ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
- parsed_payload = JSON.parse(payload, symbolize_names: true)
- thr_recipients.each do |recipient|
- target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
- target.send(method_name, parsed_payload)
+ ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
+ parsed_payload = JSON.parse(payload, symbolize_names: true)
+ thr_recipients.each do |recipient|
+ target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
+ target.send(method_name, parsed_payload)
+ end
end
end
end
end
ensure
- thr_listening.make_false
- ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
- conn.async_exec("UNLISTEN *").clear
+ run_callbacks :unlisten do
+ thr_listening.make_false
+ ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
+ connection.execute("UNLISTEN *")
+ end
end
end
end
future.add_observer(self, :listen_observer)
future.execute
end
- def with_listen_connection
- ar_conn = Execution.connection_pool.checkout.tap do |conn|
+ def with_connection
+ self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
end
- pg_conn = ar_conn.raw_connection
- raise AdapterCannotListenError unless pg_conn.respond_to? :wait_for_notify
+ connection.execute("SET application_name = #{connection.quote(self.class.name)}")
- pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear
- yield pg_conn
+ yield
ensure
- ar_conn&.disconnect!
+ connection&.disconnect!
+ self.connection = nil
+ end
+
+ def wait_for_notify
+ raw_connection = connection.raw_connection
+ if raw_connection.respond_to?(:wait_for_notify)
+ raw_connection.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
+ yield(channel, payload)
+ end
+ else
+ sleep WAIT_INTERVAL
+ end
end
end
end