lib/good_job/notifier.rb in good_job-3.18.2 vs lib/good_job/notifier.rb in good_job-3.18.3
- old
+ new
@@ -22,18 +22,20 @@
CHANNEL = 'good_job'
# Seconds to block while LISTENing for a message
WAIT_INTERVAL = 1
# Seconds to wait if database cannot be connected to
RECONNECT_INTERVAL = 5
+ # Number of consecutive connection errors before reporting an error
+ CONNECTION_ERRORS_REPORTING_THRESHOLD = 6
+
# Connection errors that will wait {RECONNECT_INTERVAL} before reconnecting
CONNECTION_ERRORS = %w[
ActiveRecord::ConnectionNotEstablished
ActiveRecord::StatementInvalid
PG::UnableToSend
PG::Error
].freeze
- CONNECTION_ERRORS_REPORTING_THRESHOLD = 3
# @!attribute [r] instances
# @!scope class
# List of all instantiated Notifiers in the current process.
# @return [Array<GoodJob::Notifier>, nil]
@@ -67,12 +69,12 @@
@executor = executor
@mutex = Mutex.new
@shutdown_event = Concurrent::Event.new.tap(&:set)
@running = Concurrent::AtomicBoolean.new(false)
- @connected = Concurrent::AtomicBoolean.new(false)
- @listening = Concurrent::AtomicBoolean.new(false)
+ @connected = Concurrent::Event.new
+ @listening = Concurrent::Event.new
@connection_errors_count = Concurrent::AtomicFixnum.new(0)
@connection_errors_reported = Concurrent::AtomicBoolean.new(false)
@enable_listening = enable_listening
@task = nil
@@ -83,19 +85,29 @@
def running?
@executor.running? && @running.true?
end
# Tests whether the notifier is active and has acquired a dedicated database connection.
+ # @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever
# @return [true, false, nil]
- def connected?
- @connected.true?
+ def connected?(timeout: nil)
+ if timeout.nil?
+ @connected.set?
+ else
+ @connected.wait(timeout == -1 ? nil : timeout)
+ end
end
# Tests whether the notifier is listening for new messages.
+ # @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever
# @return [true, false, nil]
- def listening?
- @listening.true?
+ def listening?(timeout: nil)
+ if timeout.nil?
+ @listening.set?
+ else
+ @listening.wait(timeout == -1 ? nil : timeout)
+ end
end
def shutdown?
@shutdown_event.set?
end
@@ -112,15 +124,16 @@
synchronize do
@running.make_false
if @executor.shutdown? || @task&.complete?
# clean up in the even the executor is killed
- @connected.make_false
- @listening.make_false
+ @connected.reset
+ @listening.reset
@shutdown_event.set
else
@shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil?
+ @connected.reset if @shutdown_event.set?
end
@shutdown_event.set?
end
end
@@ -150,10 +163,11 @@
end
if connection_error
@connection_errors_count.increment
if @connection_errors_reported.false? && @connection_errors_count.value >= CONNECTION_ERRORS_REPORTING_THRESHOLD
+ @connected.reset
GoodJob._on_thread_error(thread_error)
@connection_errors_reported.make_true
end
else
GoodJob._on_thread_error(thread_error)
@@ -178,19 +192,21 @@
create_listen_task(delay: 0)
end
end
def create_listen_task(delay: 0)
- @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_listening|
+ @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @connected, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_connected, thr_listening|
with_connection do
+ thr_connected.set
+
begin
Rails.application.executor.wrap do
run_callbacks :listen do
if thr_enable_listening
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
- thr_listening.make_true
+ thr_listening.set
end
end
end
end
@@ -214,11 +230,11 @@
ensure
Rails.application.executor.wrap do
run_callbacks :unlisten do
if thr_enable_listening
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
- thr_listening.make_false
+ thr_listening.reset
connection.execute("UNLISTEN *")
end
end
end
end
@@ -235,14 +251,12 @@
self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
end
end
connection.execute("SET application_name = #{connection.quote(self.class.name)}")
- @connected.make_true
yield
ensure
- @connected.make_false
connection&.disconnect!
self.connection = nil
end
def wait_for_notify