lib/qless/subscriber.rb in qless-0.9.3 vs lib/qless/subscriber.rb in qless-0.10.0
- old
+ new
@@ -1,48 +1,63 @@
+# Encoding: utf-8
+
require 'thread'
-require 'qless/wait_until'
module Qless
+ # A class used for subscribing to messages in a thread
class Subscriber
def self.start(*args, &block)
- new(*args, &block).start_pub_sub_listener
+ new(*args, &block).tap(&:start)
end
- attr_reader :client, :channel
+ attr_reader :channel, :redis
- def initialize(client, channel, &message_received_callback)
- @client = client
+ def initialize(client, channel, options = {}, &message_received_callback)
@channel = channel
@message_received_callback = message_received_callback
+ @log = options.fetch(:log) { ::Logger.new($stderr) }
- # pub/sub blocks the connection so we must use a different redis connection
- @client_redis = client.redis
+ # pub/sub blocks the connection so we must use a different redis
+ # connection
+ @client_redis = client.redis
@listener_redis = client.new_redis_connection
@my_channel = Qless.generate_jid
end
- def start_pub_sub_listener
- @thread = ::Thread.start do
- @listener_redis.subscribe(channel, @my_channel) do |on|
- on.message do |_channel, message|
- if _channel == @my_channel
- @listener_redis.unsubscribe(@my_channel)
- else
- @message_received_callback.call(self, JSON.parse(message))
- end
+ # Start a thread listening
+ def start
+ queue = ::Queue.new
+
+ @thread = Thread.start do
+ @listener_redis.subscribe(@channel, @my_channel) do |on|
+ on.subscribe do |channel|
+ queue.push(:subscribed) if channel == @channel
end
+
+ on.message do |channel, message|
+ handle_message(channel, message)
+ end
end
end
- wait_until_thread_listening
+ queue.pop
end
- def wait_until_thread_listening
- Qless::WaitUntil.wait_until(10) do
- @client_redis.publish(@my_channel, 'disconnect') == 1
+ def stop
+ @client_redis.publish(@my_channel, 'disconnect')
+ @thread.join
+ end
+
+ private
+
+ def handle_message(channel, message)
+ if channel == @my_channel
+ @listener_redis.unsubscribe(@channel, @my_channel) if message == "disconnect"
+ else
+ @message_received_callback.call(self, JSON.parse(message))
end
+ rescue Exception => error
+ @log.error("Qless::Subscriber") { error }
end
end
end
-
-