lib/good_job/notifier.rb in good_job-3.8.0 vs lib/good_job/notifier.rb in good_job-3.9.0

- old
+ new

@@ -166,50 +166,54 @@ def listen(delay: 0) future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| with_connection do begin - run_callbacks :listen do - ActiveSupport::Notifications.instrument("notifier_listen.good_job") do - connection.execute("LISTEN #{CHANNEL}") + Rails.application.executor.wrap do + run_callbacks :listen do + ActiveSupport::Notifications.instrument("notifier_listen.good_job") do + connection.execute("LISTEN #{CHANNEL}") + end + thr_listening.make_true end - thr_listening.make_true end - ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - while thr_executor.running? - wait_for_notify do |channel, payload| - next unless channel == CHANNEL + while thr_executor.running? + wait_for_notify do |channel, payload| + next unless channel == CHANNEL - ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) - parsed_payload = JSON.parse(payload, symbolize_names: true) - thr_recipients.each do |recipient| - target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] - target.send(method_name, parsed_payload) - end + ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) + parsed_payload = JSON.parse(payload, symbolize_names: true) + thr_recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name, parsed_payload) end - - reset_connection_errors end + + reset_connection_errors end end ensure - run_callbacks :unlisten do - thr_listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - connection.execute("UNLISTEN *") + Rails.application.executor.wrap do + run_callbacks :unlisten do + thr_listening.make_false + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + connection.execute("UNLISTEN *") + end end end end end future.add_observer(self, :listen_observer) future.execute end def with_connection - self.connection = Execution.connection_pool.checkout.tap do |conn| - Execution.connection_pool.remove(conn) + Rails.application.executor.wrap do + self.connection = Execution.connection_pool.checkout.tap do |conn| + Execution.connection_pool.remove(conn) + end end connection.execute("SET application_name = #{connection.quote(self.class.name)}") yield ensure