Sha256: 821f6dedb0dca25f10d25741e13596e8208bba9a8353cf407dba94f5acf5a560

Contents?: true

Size: 1.54 KB

Versions: 8

Compression:

Stored size: 1.54 KB

Contents

# Encoding: utf-8

require 'thread'

module Qless
  # A class used for subscribing to messages in a thread
  class Subscriber
    def self.start(*args, &block)
      new(*args, &block).tap(&:start)
    end

    attr_reader :channel, :redis

    def initialize(client, channel, options = {}, &message_received_callback)
      @channel = channel
      @message_received_callback = message_received_callback
      @log = options.fetch(:log) { ::Logger.new($stderr) }

      # pub/sub blocks the connection so we must use a different redis
      # connection
      @client_redis   = client.redis
      @listener_redis = client.new_redis_connection

      @my_channel = Qless.generate_jid
    end

    # Start a thread listening
    def start
      queue = ::Queue.new

      @thread = Thread.start do
        @listener_redis.subscribe(@channel, @my_channel) do |on|
          on.subscribe do |channel|
            queue.push(:subscribed) if channel == @channel
          end

          on.message do |channel, message|
            handle_message(channel, message)
          end
        end
      end

      queue.pop
    end

    def stop
      @client_redis.publish(@my_channel, 'disconnect')
      @thread.join
    end

  private

    def handle_message(channel, message)
      if channel == @my_channel
        @listener_redis.unsubscribe(@channel, @my_channel) if message == "disconnect"
      else
        @message_received_callback.call(self, JSON.parse(message))
      end
    rescue Exception => error
      @log.error("Qless::Subscriber") { error }
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
qless-0.12.0 lib/qless/subscriber.rb
qless-0.11.0 lib/qless/subscriber.rb
qless-0.10.5 lib/qless/subscriber.rb
qless-0.10.4 lib/qless/subscriber.rb
qless-0.10.3 lib/qless/subscriber.rb
qless-0.10.2 lib/qless/subscriber.rb
qless-0.10.1 lib/qless/subscriber.rb
qless-0.10.0 lib/qless/subscriber.rb