lib/good_job/notifier.rb in good_job-3.16.3 vs lib/good_job/notifier.rb in good_job-3.16.4
- old
+ new
@@ -18,20 +18,10 @@
include ProcessHeartbeat
# Default Postgres channel for LISTEN/NOTIFY
CHANNEL = 'good_job'
- # Defaults for instance of Concurrent::ThreadPoolExecutor
- EXECUTOR_OPTIONS = {
- name: name,
- min_threads: 0,
- max_threads: 1,
- auto_terminate: true,
- idletime: 60,
- max_queue: 1,
- fallback_policy: :discard,
- }.freeze
# Seconds to block while LISTENing for a message
WAIT_INTERVAL = 1
# Seconds to wait if database cannot be connected to
RECONNECT_INTERVAL = 5
# Connection errors that will wait {RECONNECT_INTERVAL} before reconnecting
@@ -68,23 +58,34 @@
# @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients
# @param recipients [Array<#call, Array(Object, Symbol)>]
# @param enable_listening [true, false]
- def initialize(*recipients, enable_listening: true)
+ # @param executor [Concurrent::ExecutorService]
+ def initialize(*recipients, enable_listening: true, executor: Concurrent.global_io_executor)
@recipients = Concurrent::Array.new(recipients)
+ @enable_listening = enable_listening
+ @executor = executor
+
+ @mutex = Mutex.new
+ @shutdown_event = Concurrent::Event.new.tap(&:set)
+ @running = Concurrent::AtomicBoolean.new(false)
@connected = Concurrent::AtomicBoolean.new(false)
@listening = Concurrent::AtomicBoolean.new(false)
@connection_errors_count = Concurrent::AtomicFixnum.new(0)
@connection_errors_reported = Concurrent::AtomicBoolean.new(false)
@enable_listening = enable_listening
+ @task = nil
- create_executor
- listen
+ start
self.class.instances << self
end
+ def running?
+ @executor.running? && @running.true?
+ end
+
# Tests whether the notifier is active and has acquired a dedicated database connection.
# @return [true, false, nil]
def connected?
@connected.true?
end
@@ -93,48 +94,47 @@
# @return [true, false, nil]
def listening?
@listening.true?
end
- # Tests whether the notifier is running.
- # @!method running?
- # @return [true, false, nil]
- delegate :running?, to: :executor, allow_nil: true
+ def shutdown?
+ @shutdown_event.set?
+ end
- # Tests whether the scheduler is shutdown.
- # @!method shutdown?
- # @return [true, false, nil]
- delegate :shutdown?, to: :executor, allow_nil: true
-
# Shut down the notifier.
# This stops the background LISTENing thread.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [Numeric, nil] Seconds to wait for active threads.
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
- # * +0+, the scheduler will immediately shutdown and stop any threads.
# * A positive number will wait that many seconds before stopping any remaining active threads.
# @return [void]
def shutdown(timeout: -1)
- return if executor.nil? || executor.shutdown?
+ synchronize do
+ @running.make_false
- executor.shutdown if executor.running?
-
- if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause
- executor_wait = timeout.negative? ? nil : timeout
- executor.kill unless executor.wait_for_termination(executor_wait)
+ if @executor.shutdown? || @task&.complete?
+ # clean up in the even the executor is killed
+ @connected.make_false
+ @listening.make_false
+ @shutdown_event.set
+ else
+ @shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil?
+ end
+ @shutdown_event.set?
end
end
# Restart the notifier.
# When shutdown, start; or shutdown and start.
# @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}.
# @return [void]
def restart(timeout: -1)
- shutdown(timeout: timeout) if running?
- create_executor
- listen
+ synchronize do
+ shutdown(timeout: timeout) unless @shutdown_event.set?
+ start
+ end
end
# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
@@ -158,25 +158,31 @@
else
GoodJob._on_thread_error(thread_error)
end
end
- return if shutdown?
-
- listen(delay: connection_error ? RECONNECT_INTERVAL : 0)
+ if @running.true?
+ create_listen_task(delay: connection_error ? RECONNECT_INTERVAL : 0)
+ else
+ @shutdown_event.set
+ end
end
private
- attr_reader :executor
+ def start
+ synchronize do
+ return if @running.true?
- def create_executor
- @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
+ @running.make_true
+ @shutdown_event.reset
+ create_listen_task(delay: 0)
+ end
end
- def listen(delay: 0)
- future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_enable_listening, thr_listening|
+ def create_listen_task(delay: 0)
+ @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_listening|
with_connection do
begin
Rails.application.executor.wrap do
run_callbacks :listen do
if thr_enable_listening
@@ -186,11 +192,11 @@
end
end
end
end
- while thr_executor.running?
+ while thr_executor.running? && thr_running.true?
run_callbacks :tick do
wait_for_notify do |channel, payload|
next unless channel == CHANNEL
ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
@@ -217,12 +223,13 @@
end
end
end
end
- future.add_observer(self, :listen_observer)
- future.execute
+ @task.add_observer(self, :listen_observer)
+ @task.execute
+ @task
end
def with_connection
Rails.application.executor.wrap do
self.connection = Execution.connection_pool.checkout.tap do |conn|
@@ -260,8 +267,16 @@
end
def reset_connection_errors
@connection_errors_count.value = 0
@connection_errors_reported.make_false
+ end
+
+ def synchronize(&block)
+ if @mutex.owned?
+ yield
+ else
+ @mutex.synchronize(&block)
+ end
end
end
end