lib/good_job/notifier.rb in good_job-3.10.1 vs lib/good_job/notifier.rb in good_job-3.11.0

- old
+ new

@@ -66,24 +66,33 @@ # List of recipients that will receive notifications. # @return [Array<#call, Array(Object, Symbol)>] attr_reader :recipients # @param recipients [Array<#call, Array(Object, Symbol)>] - def initialize(*recipients) + # @param enable_listening [true, false] + def initialize(*recipients, enable_listening: true) @recipients = Concurrent::Array.new(recipients) + @connected = Concurrent::AtomicBoolean.new(false) @listening = Concurrent::AtomicBoolean.new(false) @connection_errors_count = Concurrent::AtomicFixnum.new(0) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) + @enable_listening = enable_listening self.class.instances << self create_executor listen end - # Tests whether the notifier is active and listening for new messages. + # Tests whether the notifier is active and has acquired a dedicated database connection. # @return [true, false, nil] + def connected? + @connected.true? + end + + # Tests whether the notifier is listening for new messages. + # @return [true, false, nil] def listening? @listening.true? end # Tests whether the notifier is running. @@ -163,19 +172,21 @@ def create_executor @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS) end def listen(delay: 0) - future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| + future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_enable_listening, thr_listening| with_connection do begin Rails.application.executor.wrap do run_callbacks :listen do - ActiveSupport::Notifications.instrument("notifier_listen.good_job") do - connection.execute("LISTEN #{CHANNEL}") + if thr_enable_listening + ActiveSupport::Notifications.instrument("notifier_listen.good_job") do + connection.execute("LISTEN #{CHANNEL}") + thr_listening.make_true + end end - thr_listening.make_true end end while thr_executor.running? wait_for_notify do |channel, payload| @@ -193,13 +204,15 @@ end end ensure Rails.application.executor.wrap do run_callbacks :unlisten do - thr_listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - connection.execute("UNLISTEN *") + if thr_enable_listening + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + thr_listening.make_false + connection.execute("UNLISTEN *") + end end end end end end @@ -213,23 +226,25 @@ self.connection = Execution.connection_pool.checkout.tap do |conn| Execution.connection_pool.remove(conn) end end connection.execute("SET application_name = #{connection.quote(self.class.name)}") + @connected.make_true yield ensure + @connected.make_false connection&.disconnect! self.connection = nil end def wait_for_notify raw_connection = connection.raw_connection - if raw_connection.respond_to?(:wait_for_notify) + if @enable_listening && raw_connection.respond_to?(:wait_for_notify) raw_connection.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| yield(channel, payload) end - elsif raw_connection.respond_to?(:jdbc_connection) + elsif @enable_listening && raw_connection.respond_to?(:jdbc_connection) raw_connection.execute_query("SELECT 1") notifications = raw_connection.jdbc_connection.getNotifications Array(notifications).each do |notification| channel = notification.getName payload = notification.getParameter