lib/good_job/notifier.rb in good_job-1.2.1 vs lib/good_job/notifier.rb in good_job-1.2.2

- old
+ new

@@ -5,10 +5,11 @@ # Wrapper for Postgres LISTEN/NOTIFY # class Notifier CHANNEL = 'good_job'.freeze POOL_OPTIONS = { + name: name, min_threads: 0, max_threads: 1, auto_terminate: true, idletime: 60, max_queue: 1, @@ -67,17 +68,16 @@ @pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS) end def listen future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening| - Rails.application.reloader.wrap do - conn = ActiveRecord::Base.connection.raw_connection - ActiveSupport::Notifications.instrument("notifier_listen.good_job") do - conn.async_exec "LISTEN #{CHANNEL}" - end + begin + with_listen_connection do |conn| + ActiveSupport::Notifications.instrument("notifier_listen.good_job") do + conn.async_exec "LISTEN #{CHANNEL}" + end - begin ActiveSupport::Dependencies.interlock.permit_concurrent_loads do while pool.running? listening.make_true conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| listening.make_false @@ -91,26 +91,37 @@ end end listening.make_false end end - rescue StandardError => e - ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e }) - raise - ensure - @listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - conn.async_exec "UNLISTEN *" - end end + rescue StandardError => e + ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e }) + raise + ensure + @listening.make_false + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + conn.async_exec "UNLISTEN *" + end end end future.add_observer(self, :listen_observer) future.execute end def listen_observer(_time, _result, _thread_error) listen unless shutdown? + end + + def with_listen_connection + ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn| + ActiveRecord::Base.connection_pool.remove(conn) + end + pg_conn = ar_conn.raw_connection + pg_conn.exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}") + yield pg_conn + ensure + ar_conn.disconnect! end end end