lib/good_job/notifier.rb in good_job-1.2.4 vs lib/good_job/notifier.rb in good_job-1.2.5
- old
+ new
@@ -1,65 +1,93 @@
require 'concurrent/atomic/atomic_boolean'
module GoodJob # :nodoc:
#
- # Wrapper for Postgres LISTEN/NOTIFY
+ # Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.
#
+ # Notifiers can emit NOTIFY messages through Postgres.
+ # A notifier will LISTEN for messages by creating a background thread that runs in an instance of +Concurrent::ThreadPoolExecutor+.
+ # When a message is received, the notifier passes the message to each of its recipients.
+ #
class Notifier
+ # Default Postgres channel for LISTEN/NOTIFY
CHANNEL = 'good_job'.freeze
+ # Defaults for instance of Concurrent::ThreadPoolExecutor
POOL_OPTIONS = {
name: name,
min_threads: 0,
max_threads: 1,
auto_terminate: true,
idletime: 60,
max_queue: 1,
fallback_policy: :discard,
}.freeze
+ # Seconds to block while LISTENing for a message
WAIT_INTERVAL = 1
# @!attribute [r] instances
# @!scope class
- # @return [array<GoodJob:Adapter>] the instances of +GoodJob::Notifier+
+ # List of all instantiated Notifiers in the current process.
+ # @return [array<GoodJob:Adapter>]
cattr_reader :instances, default: [], instance_reader: false
+ # Send a message via Postgres NOTIFY
+ # @param message [#to_json]
def self.notify(message)
connection = ActiveRecord::Base.connection
connection.exec_query <<~SQL
NOTIFY #{CHANNEL}, #{connection.quote(message.to_json)}
SQL
end
+ # List of recipients that will receive notifications.
+ # @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients
+ # @param recipients [Array<#call, Array(Object, Symbol)>]
def initialize(*recipients)
@recipients = Concurrent::Array.new(recipients)
@listening = Concurrent::AtomicBoolean.new(false)
self.class.instances << self
create_pool
listen
end
+ # Tests whether the notifier is active and listening for new messages.
+ # @return [true, false, nil]
def listening?
@listening.true?
end
+ # Restart the notifier.
+ # When shutdown, start; or shutdown and start.
+ # @param wait [Boolean] Wait for background thread to finish
+ # @return [void]
def restart(wait: true)
shutdown(wait: wait)
create_pool
listen
end
+ # Shut down the notifier.
+ # This stops the background LISTENing thread.
+ # If +wait+ is +true+, the notifier will wait for background thread to shutdown.
+ # If +wait+ is +false+, this method will return immediately even though threads may still be running.
+ # Use {#shutdown?} to determine whether threads have stopped.
+ # @param wait [Boolean] Wait for actively executing jobs to finish
+ # @return [void]
def shutdown(wait: true)
return unless @pool.running?
@pool.shutdown
@pool.wait_for_termination if wait
end
+ # Tests whether the notifier is shutdown.
+ # @return [true, false, nil]
def shutdown?
!@pool.running?
end
private
@@ -68,41 +96,39 @@
@pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS)
end
def listen
future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening|
- begin
- with_listen_connection do |conn|
- ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
- conn.async_exec "LISTEN #{CHANNEL}"
- end
+ with_listen_connection do |conn|
+ ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
+ conn.async_exec "LISTEN #{CHANNEL}"
+ end
- ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
- while pool.running?
- listening.make_true
- conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
- listening.make_false
- next unless channel == CHANNEL
+ ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
+ while pool.running?
+ listening.make_true
+ conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
+ listening.make_false
+ next unless channel == CHANNEL
- ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
- parsed_payload = JSON.parse(payload, symbolize_names: true)
- recipients.each do |recipient|
- target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
- target.send(method_name, parsed_payload)
- end
+ ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
+ parsed_payload = JSON.parse(payload, symbolize_names: true)
+ recipients.each do |recipient|
+ target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
+ target.send(method_name, parsed_payload)
end
- listening.make_false
end
+ listening.make_false
end
end
- rescue StandardError => e
- ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e })
- raise
- ensure
- @listening.make_false
- ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
- conn.async_exec "UNLISTEN *"
- end
+ end
+ rescue StandardError => e
+ ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e })
+ raise
+ ensure
+ @listening.make_false
+ ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
+ conn.async_exec "UNLISTEN *"
end
end
future.add_observer(self, :listen_observer)
future.execute