lib/good_job/notifier.rb in good_job-3.10.1 vs lib/good_job/notifier.rb in good_job-3.11.0
- old
+ new
@@ -66,24 +66,33 @@
# List of recipients that will receive notifications.
# @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients
# @param recipients [Array<#call, Array(Object, Symbol)>]
- def initialize(*recipients)
+ # @param enable_listening [true, false]
+ def initialize(*recipients, enable_listening: true)
@recipients = Concurrent::Array.new(recipients)
+ @connected = Concurrent::AtomicBoolean.new(false)
@listening = Concurrent::AtomicBoolean.new(false)
@connection_errors_count = Concurrent::AtomicFixnum.new(0)
@connection_errors_reported = Concurrent::AtomicBoolean.new(false)
+ @enable_listening = enable_listening
self.class.instances << self
create_executor
listen
end
- # Tests whether the notifier is active and listening for new messages.
+ # Tests whether the notifier is active and has acquired a dedicated database connection.
# @return [true, false, nil]
+ def connected?
+ @connected.true?
+ end
+
+ # Tests whether the notifier is listening for new messages.
+ # @return [true, false, nil]
def listening?
@listening.true?
end
# Tests whether the notifier is running.
@@ -163,19 +172,21 @@
def create_executor
@executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
end
def listen(delay: 0)
- future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
+ future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_enable_listening, thr_listening|
with_connection do
begin
Rails.application.executor.wrap do
run_callbacks :listen do
- ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
- connection.execute("LISTEN #{CHANNEL}")
+ if thr_enable_listening
+ ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
+ connection.execute("LISTEN #{CHANNEL}")
+ thr_listening.make_true
+ end
end
- thr_listening.make_true
end
end
while thr_executor.running?
wait_for_notify do |channel, payload|
@@ -193,13 +204,15 @@
end
end
ensure
Rails.application.executor.wrap do
run_callbacks :unlisten do
- thr_listening.make_false
- ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
- connection.execute("UNLISTEN *")
+ if thr_enable_listening
+ ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
+ thr_listening.make_false
+ connection.execute("UNLISTEN *")
+ end
end
end
end
end
end
@@ -213,23 +226,25 @@
self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
end
end
connection.execute("SET application_name = #{connection.quote(self.class.name)}")
+ @connected.make_true
yield
ensure
+ @connected.make_false
connection&.disconnect!
self.connection = nil
end
def wait_for_notify
raw_connection = connection.raw_connection
- if raw_connection.respond_to?(:wait_for_notify)
+ if @enable_listening && raw_connection.respond_to?(:wait_for_notify)
raw_connection.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
yield(channel, payload)
end
- elsif raw_connection.respond_to?(:jdbc_connection)
+ elsif @enable_listening && raw_connection.respond_to?(:jdbc_connection)
raw_connection.execute_query("SELECT 1")
notifications = raw_connection.jdbc_connection.getNotifications
Array(notifications).each do |notification|
channel = notification.getName
payload = notification.getParameter