lib/good_job/notifier.rb in good_job-1.3.5 vs lib/good_job/notifier.rb in good_job-1.3.6
- old
+ new
@@ -88,21 +88,33 @@
# @return [true, false, nil]
def shutdown?
!@pool.running?
end
+ # Invoked on completion of ThreadPoolExecutor task
+ # @!visibility private
+ # @return [void]
+ def listen_observer(_time, _result, thread_error)
+ if thread_error
+ GoodJob.on_thread_error.call(thread_error) if GoodJob.on_thread_error.respond_to?(:call)
+ ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error })
+ end
+
+ listen unless shutdown?
+ end
+
private
def create_pool
@pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS)
end
def listen
future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening|
with_listen_connection do |conn|
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
- conn.async_exec "LISTEN #{CHANNEL}"
+ conn.async_exec("LISTEN #{CHANNEL}").clear
end
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
while pool.running?
listening.make_true
@@ -118,34 +130,27 @@
end
end
listening.make_false
end
end
+ ensure
+ listening.make_false
+ ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
+ conn.async_exec("UNLISTEN *").clear
+ end
end
- rescue StandardError => e
- ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e })
- raise
- ensure
- @listening.make_false
- ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
- conn.async_exec "UNLISTEN *"
- end
end
future.add_observer(self, :listen_observer)
future.execute
end
- def listen_observer(_time, _result, _thread_error)
- listen unless shutdown?
- end
-
def with_listen_connection
ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn|
ActiveRecord::Base.connection_pool.remove(conn)
end
pg_conn = ar_conn.raw_connection
- pg_conn.exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}")
+ pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear
yield pg_conn
ensure
ar_conn&.disconnect!
end
end