lib/good_job/notifier.rb in good_job-3.18.2 vs lib/good_job/notifier.rb in good_job-3.18.3

- old
+ new

@@ -22,18 +22,20 @@ CHANNEL = 'good_job' # Seconds to block while LISTENing for a message WAIT_INTERVAL = 1 # Seconds to wait if database cannot be connected to RECONNECT_INTERVAL = 5 + # Number of consecutive connection errors before reporting an error + CONNECTION_ERRORS_REPORTING_THRESHOLD = 6 + # Connection errors that will wait {RECONNECT_INTERVAL} before reconnecting CONNECTION_ERRORS = %w[ ActiveRecord::ConnectionNotEstablished ActiveRecord::StatementInvalid PG::UnableToSend PG::Error ].freeze - CONNECTION_ERRORS_REPORTING_THRESHOLD = 3 # @!attribute [r] instances # @!scope class # List of all instantiated Notifiers in the current process. # @return [Array<GoodJob::Notifier>, nil] @@ -67,12 +69,12 @@ @executor = executor @mutex = Mutex.new @shutdown_event = Concurrent::Event.new.tap(&:set) @running = Concurrent::AtomicBoolean.new(false) - @connected = Concurrent::AtomicBoolean.new(false) - @listening = Concurrent::AtomicBoolean.new(false) + @connected = Concurrent::Event.new + @listening = Concurrent::Event.new @connection_errors_count = Concurrent::AtomicFixnum.new(0) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening @task = nil @@ -83,19 +85,29 @@ def running? @executor.running? && @running.true? end # Tests whether the notifier is active and has acquired a dedicated database connection. + # @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever # @return [true, false, nil] - def connected? - @connected.true? + def connected?(timeout: nil) + if timeout.nil? + @connected.set? + else + @connected.wait(timeout == -1 ? nil : timeout) + end end # Tests whether the notifier is listening for new messages. + # @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever # @return [true, false, nil] - def listening? - @listening.true? + def listening?(timeout: nil) + if timeout.nil? + @listening.set? + else + @listening.wait(timeout == -1 ? nil : timeout) + end end def shutdown? @shutdown_event.set? end @@ -112,15 +124,16 @@ synchronize do @running.make_false if @executor.shutdown? || @task&.complete? # clean up in the even the executor is killed - @connected.make_false - @listening.make_false + @connected.reset + @listening.reset @shutdown_event.set else @shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil? + @connected.reset if @shutdown_event.set? end @shutdown_event.set? end end @@ -150,10 +163,11 @@ end if connection_error @connection_errors_count.increment if @connection_errors_reported.false? && @connection_errors_count.value >= CONNECTION_ERRORS_REPORTING_THRESHOLD + @connected.reset GoodJob._on_thread_error(thread_error) @connection_errors_reported.make_true end else GoodJob._on_thread_error(thread_error) @@ -178,19 +192,21 @@ create_listen_task(delay: 0) end end def create_listen_task(delay: 0) - @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_listening| + @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @connected, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_connected, thr_listening| with_connection do + thr_connected.set + begin Rails.application.executor.wrap do run_callbacks :listen do if thr_enable_listening ActiveSupport::Notifications.instrument("notifier_listen.good_job") do connection.execute("LISTEN #{CHANNEL}") - thr_listening.make_true + thr_listening.set end end end end @@ -214,11 +230,11 @@ ensure Rails.application.executor.wrap do run_callbacks :unlisten do if thr_enable_listening ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - thr_listening.make_false + thr_listening.reset connection.execute("UNLISTEN *") end end end end @@ -235,14 +251,12 @@ self.connection = Execution.connection_pool.checkout.tap do |conn| Execution.connection_pool.remove(conn) end end connection.execute("SET application_name = #{connection.quote(self.class.name)}") - @connected.make_true yield ensure - @connected.make_false connection&.disconnect! self.connection = nil end def wait_for_notify