lib/good_job/notifier.rb in good_job-3.8.0 vs lib/good_job/notifier.rb in good_job-3.9.0
- old
+ new
@@ -166,50 +166,54 @@
def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_connection do
begin
- run_callbacks :listen do
- ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
- connection.execute("LISTEN #{CHANNEL}")
+ Rails.application.executor.wrap do
+ run_callbacks :listen do
+ ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
+ connection.execute("LISTEN #{CHANNEL}")
+ end
+ thr_listening.make_true
end
- thr_listening.make_true
end
- ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
- while thr_executor.running?
- wait_for_notify do |channel, payload|
- next unless channel == CHANNEL
+ while thr_executor.running?
+ wait_for_notify do |channel, payload|
+ next unless channel == CHANNEL
- ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
- parsed_payload = JSON.parse(payload, symbolize_names: true)
- thr_recipients.each do |recipient|
- target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
- target.send(method_name, parsed_payload)
- end
+ ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
+ parsed_payload = JSON.parse(payload, symbolize_names: true)
+ thr_recipients.each do |recipient|
+ target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
+ target.send(method_name, parsed_payload)
end
-
- reset_connection_errors
end
+
+ reset_connection_errors
end
end
ensure
- run_callbacks :unlisten do
- thr_listening.make_false
- ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
- connection.execute("UNLISTEN *")
+ Rails.application.executor.wrap do
+ run_callbacks :unlisten do
+ thr_listening.make_false
+ ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
+ connection.execute("UNLISTEN *")
+ end
end
end
end
end
future.add_observer(self, :listen_observer)
future.execute
end
def with_connection
- self.connection = Execution.connection_pool.checkout.tap do |conn|
- Execution.connection_pool.remove(conn)
+ Rails.application.executor.wrap do
+ self.connection = Execution.connection_pool.checkout.tap do |conn|
+ Execution.connection_pool.remove(conn)
+ end
end
connection.execute("SET application_name = #{connection.quote(self.class.name)}")
yield
ensure