lib/good_job/notifier.rb in good_job-1.2.1 vs lib/good_job/notifier.rb in good_job-1.2.2
- old
+ new
@@ -5,10 +5,11 @@
# Wrapper for Postgres LISTEN/NOTIFY
#
class Notifier
CHANNEL = 'good_job'.freeze
POOL_OPTIONS = {
+ name: name,
min_threads: 0,
max_threads: 1,
auto_terminate: true,
idletime: 60,
max_queue: 1,
@@ -67,17 +68,16 @@
@pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS)
end
def listen
future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening|
- Rails.application.reloader.wrap do
- conn = ActiveRecord::Base.connection.raw_connection
- ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
- conn.async_exec "LISTEN #{CHANNEL}"
- end
+ begin
+ with_listen_connection do |conn|
+ ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
+ conn.async_exec "LISTEN #{CHANNEL}"
+ end
- begin
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
while pool.running?
listening.make_true
conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
listening.make_false
@@ -91,26 +91,37 @@
end
end
listening.make_false
end
end
- rescue StandardError => e
- ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e })
- raise
- ensure
- @listening.make_false
- ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
- conn.async_exec "UNLISTEN *"
- end
end
+ rescue StandardError => e
+ ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e })
+ raise
+ ensure
+ @listening.make_false
+ ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
+ conn.async_exec "UNLISTEN *"
+ end
end
end
future.add_observer(self, :listen_observer)
future.execute
end
def listen_observer(_time, _result, _thread_error)
listen unless shutdown?
+ end
+
+ def with_listen_connection
+ ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn|
+ ActiveRecord::Base.connection_pool.remove(conn)
+ end
+ pg_conn = ar_conn.raw_connection
+ pg_conn.exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}")
+ yield pg_conn
+ ensure
+ ar_conn.disconnect!
end
end
end