lib/good_job/notifier.rb in good_job-2.7.4 vs lib/good_job/notifier.rb in good_job-2.8.0

- old
+ new

@@ -1,6 +1,7 @@ # frozen_string_literal: true +require 'active_support/core_ext/module/attribute_accessors_per_thread' require 'concurrent/atomic/atomic_boolean' module GoodJob # :nodoc: # # Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes. @@ -8,10 +9,15 @@ # Notifiers can emit NOTIFY messages through Postgres. # A notifier will LISTEN for messages by creating a background thread that runs in an instance of +Concurrent::ThreadPoolExecutor+. # When a message is received, the notifier passes the message to each of its recipients. # class Notifier + include ActiveSupport::Callbacks + define_callbacks :listen, :unlisten + + include Notifier::ProcessRegistration + # Raised if the Database adapter does not implement LISTEN. AdapterCannotListenError = Class.new(StandardError) # Default Postgres channel for LISTEN/NOTIFY CHANNEL = 'good_job' @@ -41,10 +47,16 @@ # @!scope class # List of all instantiated Notifiers in the current process. # @return [Array<GoodJob::Notifier>, nil] cattr_reader :instances, default: [], instance_reader: false + # @!attribute [rw] connection + # @!scope class + # ActiveRecord Connection that has been established for the Notifier. + # @return [ActiveRecord::ConnectionAdapters::AbstractAdapter, nil] + thread_cattr_accessor :connection + # Send a message via Postgres NOTIFY # @param message [#to_json] def self.notify(message) connection = Execution.connection connection.exec_query <<~SQL.squish @@ -144,51 +156,67 @@ @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS) end def listen(delay: 0) future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| - with_listen_connection do |conn| - ActiveSupport::Notifications.instrument("notifier_listen.good_job") do - conn.async_exec("LISTEN #{CHANNEL}").clear - end + with_connection do + begin + run_callbacks :listen do + ActiveSupport::Notifications.instrument("notifier_listen.good_job") do + connection.execute("LISTEN #{CHANNEL}") + end + thr_listening.make_true + end - ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - thr_listening.make_true - while thr_executor.running? - conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| - next unless channel == CHANNEL + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + while thr_executor.running? + wait_for_notify do |channel, payload| + next unless channel == CHANNEL - ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) - parsed_payload = JSON.parse(payload, symbolize_names: true) - thr_recipients.each do |recipient| - target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] - target.send(method_name, parsed_payload) + ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) + parsed_payload = JSON.parse(payload, symbolize_names: true) + thr_recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name, parsed_payload) + end end end end end ensure - thr_listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - conn.async_exec("UNLISTEN *").clear + run_callbacks :unlisten do + thr_listening.make_false + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + connection.execute("UNLISTEN *") + end end end end future.add_observer(self, :listen_observer) future.execute end - def with_listen_connection - ar_conn = Execution.connection_pool.checkout.tap do |conn| + def with_connection + self.connection = Execution.connection_pool.checkout.tap do |conn| Execution.connection_pool.remove(conn) end - pg_conn = ar_conn.raw_connection - raise AdapterCannotListenError unless pg_conn.respond_to? :wait_for_notify + connection.execute("SET application_name = #{connection.quote(self.class.name)}") - pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear - yield pg_conn + yield ensure - ar_conn&.disconnect! + connection&.disconnect! + self.connection = nil + end + + def wait_for_notify + raw_connection = connection.raw_connection + if raw_connection.respond_to?(:wait_for_notify) + raw_connection.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| + yield(channel, payload) + end + else + sleep WAIT_INTERVAL + end end end end