Sha256: de55270642c62ca442351d9c008c67ce3b1f406d609e72fc882c08cde7ceccf2

Contents?: true

Size: 1.12 KB

Versions: 3

Compression:

Stored size: 1.12 KB

Contents

module Toro
  class Listener
    include Actor

    def initialize(options={})
      defaults = {
        queues: [Toro.options[:default_queue]]
      }
      options.reverse_merge!(defaults)
      @queues = options[:queues]
      @fetcher = options[:fetcher]
      @manager = options[:manager]
      @is_done = false
      raise 'No fetcher provided' if @fetcher.blank?
      raise 'No manager provided' if @manager.blank?
    end

    def start
      @manager.register_actor(:listener, self)
      Toro::Database.with_connection do
        Toro::Database.raw_connection.async_exec(channels.map { |channel| "LISTEN #{channel}" }.join('; '))
        wait_for_notify
      end
    end

    def stop
      Toro::Database.raw_connection.async_exec(channels.map { |channel| "UNLISTEN #{channel}" }.join('; '))
      @is_done = true
    end

    protected

    def wait_for_notify
      Toro::Database.raw_connection.wait_for_notify(Toro.options[:listen_interval]) do |channel, pid, payload|
        @fetcher.notify
      end
      wait_for_notify unless @is_done
    end

    def channels
      @queues.map { |queue| "toro_#{queue}" }
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
toro-0.1.0 lib/toro/listener.rb
toro-0.0.3 lib/toro/listener.rb
toro-0.0.2 lib/toro/listener.rb