lib/good_job/notifier.rb in good_job-3.16.3 vs lib/good_job/notifier.rb in good_job-3.16.4

- old
+ new

@@ -18,20 +18,10 @@ include ProcessHeartbeat # Default Postgres channel for LISTEN/NOTIFY CHANNEL = 'good_job' - # Defaults for instance of Concurrent::ThreadPoolExecutor - EXECUTOR_OPTIONS = { - name: name, - min_threads: 0, - max_threads: 1, - auto_terminate: true, - idletime: 60, - max_queue: 1, - fallback_policy: :discard, - }.freeze # Seconds to block while LISTENing for a message WAIT_INTERVAL = 1 # Seconds to wait if database cannot be connected to RECONNECT_INTERVAL = 5 # Connection errors that will wait {RECONNECT_INTERVAL} before reconnecting @@ -68,23 +58,34 @@ # @return [Array<#call, Array(Object, Symbol)>] attr_reader :recipients # @param recipients [Array<#call, Array(Object, Symbol)>] # @param enable_listening [true, false] - def initialize(*recipients, enable_listening: true) + # @param executor [Concurrent::ExecutorService] + def initialize(*recipients, enable_listening: true, executor: Concurrent.global_io_executor) @recipients = Concurrent::Array.new(recipients) + @enable_listening = enable_listening + @executor = executor + + @mutex = Mutex.new + @shutdown_event = Concurrent::Event.new.tap(&:set) + @running = Concurrent::AtomicBoolean.new(false) @connected = Concurrent::AtomicBoolean.new(false) @listening = Concurrent::AtomicBoolean.new(false) @connection_errors_count = Concurrent::AtomicFixnum.new(0) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening + @task = nil - create_executor - listen + start self.class.instances << self end + def running? + @executor.running? && @running.true? + end + # Tests whether the notifier is active and has acquired a dedicated database connection. # @return [true, false, nil] def connected? @connected.true? end @@ -93,48 +94,47 @@ # @return [true, false, nil] def listening? @listening.true? end - # Tests whether the notifier is running. - # @!method running? - # @return [true, false, nil] - delegate :running?, to: :executor, allow_nil: true + def shutdown? + @shutdown_event.set? + end - # Tests whether the scheduler is shutdown. - # @!method shutdown? - # @return [true, false, nil] - delegate :shutdown?, to: :executor, allow_nil: true - # Shut down the notifier. # This stops the background LISTENing thread. # Use {#shutdown?} to determine whether threads have stopped. # @param timeout [Numeric, nil] Seconds to wait for active threads. # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete. # * +-1+, the scheduler will wait until the shutdown is complete. - # * +0+, the scheduler will immediately shutdown and stop any threads. # * A positive number will wait that many seconds before stopping any remaining active threads. # @return [void] def shutdown(timeout: -1) - return if executor.nil? || executor.shutdown? + synchronize do + @running.make_false - executor.shutdown if executor.running? - - if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause - executor_wait = timeout.negative? ? nil : timeout - executor.kill unless executor.wait_for_termination(executor_wait) + if @executor.shutdown? || @task&.complete? + # clean up in the even the executor is killed + @connected.make_false + @listening.make_false + @shutdown_event.set + else + @shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil? + end + @shutdown_event.set? end end # Restart the notifier. # When shutdown, start; or shutdown and start. # @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}. # @return [void] def restart(timeout: -1) - shutdown(timeout: timeout) if running? - create_executor - listen + synchronize do + shutdown(timeout: timeout) unless @shutdown_event.set? + start + end end # Invoked on completion of ThreadPoolExecutor task # @!visibility private # @return [void] @@ -158,25 +158,31 @@ else GoodJob._on_thread_error(thread_error) end end - return if shutdown? - - listen(delay: connection_error ? RECONNECT_INTERVAL : 0) + if @running.true? + create_listen_task(delay: connection_error ? RECONNECT_INTERVAL : 0) + else + @shutdown_event.set + end end private - attr_reader :executor + def start + synchronize do + return if @running.true? - def create_executor - @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS) + @running.make_true + @shutdown_event.reset + create_listen_task(delay: 0) + end end - def listen(delay: 0) - future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_enable_listening, thr_listening| + def create_listen_task(delay: 0) + @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_listening| with_connection do begin Rails.application.executor.wrap do run_callbacks :listen do if thr_enable_listening @@ -186,11 +192,11 @@ end end end end - while thr_executor.running? + while thr_executor.running? && thr_running.true? run_callbacks :tick do wait_for_notify do |channel, payload| next unless channel == CHANNEL ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) @@ -217,12 +223,13 @@ end end end end - future.add_observer(self, :listen_observer) - future.execute + @task.add_observer(self, :listen_observer) + @task.execute + @task end def with_connection Rails.application.executor.wrap do self.connection = Execution.connection_pool.checkout.tap do |conn| @@ -260,8 +267,16 @@ end def reset_connection_errors @connection_errors_count.value = 0 @connection_errors_reported.make_false + end + + def synchronize(&block) + if @mutex.owned? + yield + else + @mutex.synchronize(&block) + end end end end