Sha256: de55270642c62ca442351d9c008c67ce3b1f406d609e72fc882c08cde7ceccf2
Contents?: true
Size: 1.12 KB
Versions: 3
Compression:
Stored size: 1.12 KB
Contents
module Toro class Listener include Actor def initialize(options={}) defaults = { queues: [Toro.options[:default_queue]] } options.reverse_merge!(defaults) @queues = options[:queues] @fetcher = options[:fetcher] @manager = options[:manager] @is_done = false raise 'No fetcher provided' if @fetcher.blank? raise 'No manager provided' if @manager.blank? end def start @manager.register_actor(:listener, self) Toro::Database.with_connection do Toro::Database.raw_connection.async_exec(channels.map { |channel| "LISTEN #{channel}" }.join('; ')) wait_for_notify end end def stop Toro::Database.raw_connection.async_exec(channels.map { |channel| "UNLISTEN #{channel}" }.join('; ')) @is_done = true end protected def wait_for_notify Toro::Database.raw_connection.wait_for_notify(Toro.options[:listen_interval]) do |channel, pid, payload| @fetcher.notify end wait_for_notify unless @is_done end def channels @queues.map { |queue| "toro_#{queue}" } end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
toro-0.1.0 | lib/toro/listener.rb |
toro-0.0.3 | lib/toro/listener.rb |
toro-0.0.2 | lib/toro/listener.rb |