lib/good_job/notifier.rb in good_job-1.7.1 vs lib/good_job/notifier.rb in good_job-1.8.0
- old
+ new
@@ -13,11 +13,11 @@
AdapterCannotListenError = Class.new(StandardError)
# Default Postgres channel for LISTEN/NOTIFY
CHANNEL = 'good_job'.freeze
# Defaults for instance of Concurrent::ThreadPoolExecutor
- POOL_OPTIONS = {
+ EXECUTOR_OPTIONS = {
name: name,
min_threads: 0,
max_threads: 1,
auto_terminate: true,
idletime: 60,
@@ -28,11 +28,11 @@
WAIT_INTERVAL = 1
# @!attribute [r] instances
# @!scope class
# List of all instantiated Notifiers in the current process.
- # @return [array<GoodJob:Adapter>]
+ # @return [Array<GoodJob:Adapter>]
cattr_reader :instances, default: [], instance_reader: false
# Send a message via Postgres NOTIFY
# @param message [#to_json]
def self.notify(message)
@@ -51,48 +51,57 @@
@recipients = Concurrent::Array.new(recipients)
@listening = Concurrent::AtomicBoolean.new(false)
self.class.instances << self
- create_pool
+ create_executor
listen
end
# Tests whether the notifier is active and listening for new messages.
# @return [true, false, nil]
def listening?
@listening.true?
end
- # Restart the notifier.
- # When shutdown, start; or shutdown and start.
- # @param wait [Boolean] Wait for background thread to finish
- # @return [void]
- def restart(wait: true)
- shutdown(wait: wait)
- create_pool
- listen
- end
+ # Tests whether the notifier is running.
+ # @return [true, false, nil]
+ delegate :running?, to: :executor, allow_nil: true
+ # Tests whether the scheduler is shutdown.
+ # @return [true, false, nil]
+ delegate :shutdown?, to: :executor, allow_nil: true
+
# Shut down the notifier.
# This stops the background LISTENing thread.
- # If +wait+ is +true+, the notifier will wait for background thread to shutdown.
- # If +wait+ is +false+, this method will return immediately even though threads may still be running.
# Use {#shutdown?} to determine whether threads have stopped.
- # @param wait [Boolean] Wait for actively executing threads to finish
+ # @param timeout [nil, Numeric] Seconds to wait for active threads.
+ #
+ # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
+ # * +-1+, the scheduler will wait until the shutdown is complete.
+ # * +0+, the scheduler will immediately shutdown and stop any threads.
+ # * A positive number will wait that many seconds before stopping any remaining active threads.
# @return [void]
- def shutdown(wait: true)
- return unless @pool.running?
+ def shutdown(timeout: -1)
+ return if executor.nil? || executor.shutdown?
- @pool.shutdown
- @pool.wait_for_termination if wait
+ executor.shutdown if executor.running?
+
+ if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause
+ executor_wait = timeout.negative? ? nil : timeout
+ executor.kill unless executor.wait_for_termination(executor_wait)
+ end
end
- # Tests whether the notifier is shutdown.
- # @return [true, false, nil]
- def shutdown?
- !@pool.running?
+ # Restart the notifier.
+ # When shutdown, start; or shutdown and start.
+ # @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}.
+ # @return [void]
+ def restart(timeout: -1)
+ shutdown(timeout: timeout) if running?
+ create_executor
+ listen
end
# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
@@ -107,39 +116,39 @@
listen unless shutdown?
end
private
- def create_pool
- @pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS)
+ attr_reader :executor
+
+ def create_executor
+ @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
end
def listen
- future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening|
+ future = Concurrent::Future.new(args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_listen_connection do |conn|
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
conn.async_exec("LISTEN #{CHANNEL}").clear
end
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
- while pool.running?
- listening.make_true
+ thr_listening.make_true
+ while thr_executor.running?
conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
- listening.make_false
next unless channel == CHANNEL
ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
- recipients.each do |recipient|
+ thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end
- listening.make_false
end
end
ensure
- listening.make_false
+ thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
conn.async_exec("UNLISTEN *").clear
end
end
end