Sha256: 24eddd21b4fb6844220e326a6ea543acb6f8beb168ec6f950ae15544e60de47f

Contents?: true

Size: 1.92 KB

Versions: 5

Compression:

Stored size: 1.92 KB

Contents

# frozen_string_literal: true

module Delayed
  module Master
    module Postgresql
      class JobListener < Delayed::Master::JobListener
        def initialize(master)
          @master = master
          @config = master.config
          @databases = master.databases
          @threads = []
        end

        def start
          @threads = @databases.map do |database|
            Thread.new(database) do |database|
              loop do
                if @master.stop?
                  break
                else
                  listen(database)
                end
              end
            end
          end
        end

        def wait
          @threads.each(&:join)
        end

        def shutdown
          @threads.each(&:kill)
        end

        private

        def listen(database)
          database.with_connection do |connection|
            listen_connection(database, connection) do
              loop do
                if @master.stop?
                  break
                else
                  wait_for_notify(database, connection)
                end
              end
            end
          end
        end

        def listen_connection(database, connection)
          @master.logger.info { "listening @#{database.spec_name}..." }
          connection.execute("LISTEN delayed_job_master")
          yield
        rescue => e
          @master.logger.warn { "#{e.class}: #{e.message}" }
          @master.logger.debug { e.backtrace.join("\n") }
        ensure
          @master.logger.info { "unlisten @#{database.spec_name}" }
          connection.execute("UNLISTEN delayed_job_master")
        end

        def wait_for_notify(database, connection)
          connection.raw_connection.wait_for_notify(1) do |_event, _pid, _payload|
            @master.logger.info { "received notification @#{database.spec_name}" }
            @master.job_checker.schedule(database)
          end
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
delayed_job_master-3.1.2 lib/delayed/master/postgresql/job_listener.rb
delayed_job_master-3.1.1 lib/delayed/master/postgresql/job_listener.rb
delayed_job_master-3.1.0 lib/delayed/master/postgresql/job_listener.rb
delayed_job_master-3.0.1 lib/delayed/master/postgresql/job_listener.rb
delayed_job_master-3.0.0 lib/delayed/master/postgresql/job_listener.rb