lib/qless/subscriber.rb in qless-0.9.3 vs lib/qless/subscriber.rb in qless-0.10.0

- old
+ new

@@ -1,48 +1,63 @@ +# Encoding: utf-8 + require 'thread' -require 'qless/wait_until' module Qless + # A class used for subscribing to messages in a thread class Subscriber def self.start(*args, &block) - new(*args, &block).start_pub_sub_listener + new(*args, &block).tap(&:start) end - attr_reader :client, :channel + attr_reader :channel, :redis - def initialize(client, channel, &message_received_callback) - @client = client + def initialize(client, channel, options = {}, &message_received_callback) @channel = channel @message_received_callback = message_received_callback + @log = options.fetch(:log) { ::Logger.new($stderr) } - # pub/sub blocks the connection so we must use a different redis connection - @client_redis = client.redis + # pub/sub blocks the connection so we must use a different redis + # connection + @client_redis = client.redis @listener_redis = client.new_redis_connection @my_channel = Qless.generate_jid end - def start_pub_sub_listener - @thread = ::Thread.start do - @listener_redis.subscribe(channel, @my_channel) do |on| - on.message do |_channel, message| - if _channel == @my_channel - @listener_redis.unsubscribe(@my_channel) - else - @message_received_callback.call(self, JSON.parse(message)) - end + # Start a thread listening + def start + queue = ::Queue.new + + @thread = Thread.start do + @listener_redis.subscribe(@channel, @my_channel) do |on| + on.subscribe do |channel| + queue.push(:subscribed) if channel == @channel end + + on.message do |channel, message| + handle_message(channel, message) + end end end - wait_until_thread_listening + queue.pop end - def wait_until_thread_listening - Qless::WaitUntil.wait_until(10) do - @client_redis.publish(@my_channel, 'disconnect') == 1 + def stop + @client_redis.publish(@my_channel, 'disconnect') + @thread.join + end + + private + + def handle_message(channel, message) + if channel == @my_channel + @listener_redis.unsubscribe(@channel, @my_channel) if message == "disconnect" + else + @message_received_callback.call(self, JSON.parse(message)) end + rescue Exception => error + @log.error("Qless::Subscriber") { error } end end end - -