lib/good_job/notifier.rb in good_job-1.7.1 vs lib/good_job/notifier.rb in good_job-1.8.0

- old
+ new

@@ -13,11 +13,11 @@ AdapterCannotListenError = Class.new(StandardError) # Default Postgres channel for LISTEN/NOTIFY CHANNEL = 'good_job'.freeze # Defaults for instance of Concurrent::ThreadPoolExecutor - POOL_OPTIONS = { + EXECUTOR_OPTIONS = { name: name, min_threads: 0, max_threads: 1, auto_terminate: true, idletime: 60, @@ -28,11 +28,11 @@ WAIT_INTERVAL = 1 # @!attribute [r] instances # @!scope class # List of all instantiated Notifiers in the current process. - # @return [array<GoodJob:Adapter>] + # @return [Array<GoodJob:Adapter>] cattr_reader :instances, default: [], instance_reader: false # Send a message via Postgres NOTIFY # @param message [#to_json] def self.notify(message) @@ -51,48 +51,57 @@ @recipients = Concurrent::Array.new(recipients) @listening = Concurrent::AtomicBoolean.new(false) self.class.instances << self - create_pool + create_executor listen end # Tests whether the notifier is active and listening for new messages. # @return [true, false, nil] def listening? @listening.true? end - # Restart the notifier. - # When shutdown, start; or shutdown and start. - # @param wait [Boolean] Wait for background thread to finish - # @return [void] - def restart(wait: true) - shutdown(wait: wait) - create_pool - listen - end + # Tests whether the notifier is running. + # @return [true, false, nil] + delegate :running?, to: :executor, allow_nil: true + # Tests whether the scheduler is shutdown. + # @return [true, false, nil] + delegate :shutdown?, to: :executor, allow_nil: true + # Shut down the notifier. # This stops the background LISTENing thread. - # If +wait+ is +true+, the notifier will wait for background thread to shutdown. - # If +wait+ is +false+, this method will return immediately even though threads may still be running. # Use {#shutdown?} to determine whether threads have stopped. - # @param wait [Boolean] Wait for actively executing threads to finish + # @param timeout [nil, Numeric] Seconds to wait for active threads. + # + # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete. + # * +-1+, the scheduler will wait until the shutdown is complete. + # * +0+, the scheduler will immediately shutdown and stop any threads. + # * A positive number will wait that many seconds before stopping any remaining active threads. # @return [void] - def shutdown(wait: true) - return unless @pool.running? + def shutdown(timeout: -1) + return if executor.nil? || executor.shutdown? - @pool.shutdown - @pool.wait_for_termination if wait + executor.shutdown if executor.running? + + if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause + executor_wait = timeout.negative? ? nil : timeout + executor.kill unless executor.wait_for_termination(executor_wait) + end end - # Tests whether the notifier is shutdown. - # @return [true, false, nil] - def shutdown? - !@pool.running? + # Restart the notifier. + # When shutdown, start; or shutdown and start. + # @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}. + # @return [void] + def restart(timeout: -1) + shutdown(timeout: timeout) if running? + create_executor + listen end # Invoked on completion of ThreadPoolExecutor task # @!visibility private # @return [void] @@ -107,39 +116,39 @@ listen unless shutdown? end private - def create_pool - @pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS) + attr_reader :executor + + def create_executor + @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS) end def listen - future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening| + future = Concurrent::Future.new(args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| with_listen_connection do |conn| ActiveSupport::Notifications.instrument("notifier_listen.good_job") do conn.async_exec("LISTEN #{CHANNEL}").clear end ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - while pool.running? - listening.make_true + thr_listening.make_true + while thr_executor.running? conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| - listening.make_false next unless channel == CHANNEL ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) parsed_payload = JSON.parse(payload, symbolize_names: true) - recipients.each do |recipient| + thr_recipients.each do |recipient| target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] target.send(method_name, parsed_payload) end end - listening.make_false end end ensure - listening.make_false + thr_listening.make_false ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do conn.async_exec("UNLISTEN *").clear end end end