lib/good_job/notifier.rb in good_job-1.11.1 vs lib/good_job/notifier.rb in good_job-1.11.2
- old
+ new
@@ -22,10 +22,12 @@
auto_terminate: true,
idletime: 60,
max_queue: 1,
fallback_policy: :discard,
}.freeze
+ # Seconds to wait if database cannot be connected to
+ RECONNECT_INTERVAL = 5
# Seconds to block while LISTENing for a message
WAIT_INTERVAL = 1
# @!attribute [r] instances
# @!scope class
@@ -112,22 +114,28 @@
if thread_error
GoodJob.on_thread_error.call(thread_error) if GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error })
end
- listen unless shutdown?
+ return if shutdown?
+
+ if thread_error.is_a?(ActiveRecord::ConnectionNotEstablished) || thread_error.is_a?(ActiveRecord::StatementInvalid)
+ listen(delay: RECONNECT_INTERVAL)
+ else
+ listen
+ end
end
private
attr_reader :executor
def create_executor
@executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
end
- def listen
- future = Concurrent::Future.new(args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
+ def listen(delay: 0)
+ future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_listen_connection do |conn|
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
conn.async_exec("LISTEN #{CHANNEL}").clear
end