lib/good_job/notifier.rb in good_job-1.3.5 vs lib/good_job/notifier.rb in good_job-1.3.6

- old
+ new

@@ -88,21 +88,33 @@ # @return [true, false, nil] def shutdown? !@pool.running? end + # Invoked on completion of ThreadPoolExecutor task + # @!visibility private + # @return [void] + def listen_observer(_time, _result, thread_error) + if thread_error + GoodJob.on_thread_error.call(thread_error) if GoodJob.on_thread_error.respond_to?(:call) + ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error }) + end + + listen unless shutdown? + end + private def create_pool @pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS) end def listen future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening| with_listen_connection do |conn| ActiveSupport::Notifications.instrument("notifier_listen.good_job") do - conn.async_exec "LISTEN #{CHANNEL}" + conn.async_exec("LISTEN #{CHANNEL}").clear end ActiveSupport::Dependencies.interlock.permit_concurrent_loads do while pool.running? listening.make_true @@ -118,34 +130,27 @@ end end listening.make_false end end + ensure + listening.make_false + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + conn.async_exec("UNLISTEN *").clear + end end - rescue StandardError => e - ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e }) - raise - ensure - @listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - conn.async_exec "UNLISTEN *" - end end future.add_observer(self, :listen_observer) future.execute end - def listen_observer(_time, _result, _thread_error) - listen unless shutdown? - end - def with_listen_connection ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn| ActiveRecord::Base.connection_pool.remove(conn) end pg_conn = ar_conn.raw_connection - pg_conn.exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}") + pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear yield pg_conn ensure ar_conn&.disconnect! end end