lib/firehose/subscription.rb in firehose-0.0.16 vs lib/firehose/subscription.rb in firehose-0.1.0

- old
+ new

@@ -1,65 +1,75 @@ module Firehose class Subscription - # Default TTL for how long a subscription should live on the server when the - # consumer disconnects. - # TODO should the Consumer handle TTL? - TTL = 15000 + # consumer_id and channel for the subscription. + attr_reader :consumer_id - # Time to live for the amqp_queue on the server after the subscription is canceled. This - # is mostly for flakey connections where the client may reconnect after *ttl* and continue - # receiving messages. - attr_accessor :ttl - - # Consumer and channel for the subscription. - attr_reader :consumer - # Channel that we'll use for the pub-sub activity. This probably maps to an URL attr_reader :channel - def initialize(consumer, channel) - @consumer, @channel = consumer, channel + def initialize(consumer_id, channel) + @consumer_id, @channel = consumer_id, channel end - # TODO - Move the channel to an initializer so that we can force on AMQP subscription per one - # Firehose subscription. As it stands now, you could fire off multple subscriptions to diff amqp_channels + # Subscribe to messages on the backend to fill up the subscription queue. consumer_ids of the messages + # will queue up units of "work" to process data from the subscription. def subscribe(&block) - amqp_queue_name = "#{consumer.guid}@#{channel}" - amqp_channel = AMQP::Channel.new(Firehose.amqp.connection).prefetch(1) - amqp_exchange = AMQP::Exchange.new(amqp_channel, :fanout, channel, :auto_delete => true) - amqp_queue = AMQP::Queue.new(amqp_channel, amqp_queue_name, :arguments => {'x-expires' => ttl}) - amqp_queue.bind(amqp_exchange) - - # When we get a message, we want to remove the consumer from the amqp_queue so that the x-expires - # ttl starts ticking down. On the reconnect, the consumer connects to the amqp_queue and resets the - # timer on x-expires... in theory at least. - @amqp_consumer = AMQP::Consumer.new(amqp_channel, amqp_queue, consumer.guid) - @amqp_consumer.on_delivery do |metadata, message| - Firehose.logger.debug "AMQP delivering `#{message}` to `#{consumer.guid}@#{channel}`" - block.call(message, self) - # The ack needs to go after the block is called. This makes sure that all processing - # happens downstream before we remove it from the amqp_queue entirely. - metadata.ack - end.consume - Firehose.logger.debug "AMQP subscribed to `#{consumer.guid}@#{channel}`" + redis.subscribe(channel) + redis.on(:message) do |channel, message| + Firehose.logger.debug "Redis recieved `#{message}` to `#{consumer_id}@#{channel}`" + block.call message, self + end + Firehose.logger.debug "Redis subscribed to `#{consumer_id}@#{channel}`" self # Return the subscription for chaining. end - def unsubscribe - Firehose.logger.debug "AMQP unsubscribed" - @amqp_consumer.cancel if @amqp_consumer - @unsubscribe_callback.call self if @unsubscribe_callback + def unsubscribe(&block) + Firehose.logger.debug "Redis unsubscribed from `#{consumer_id}@#{channel}`" + redis.close + block.call(self) if block + self end - # Callback when consumer unsubscribes from subscription. The consumer uses this to remove - # subscriptions from itself when an unsubscribe happens. - def on_unsubscribe(&block) - @unsubscribe_callback = block + private + def redis + @redis ||= EM::Hiredis.connect end + end - # The time that a amqp_queue should live *after* the client unsubscribes. This is useful for - # flakey network connections, like HTTP Long Polling or even broken web sockets. - def ttl - @ttl ||= TTL + # Queue subscription messages so that we can remember and/or operate on them + class Subscription::Queue + attr_reader :subscription, :channel + + def initialize(consumer_id, channel) + @subscription = Subscription.new(consumer_id, channel) + # Start the subscription and start dropping mesasge onto the queue + subscription.subscribe do |message| + queue.push message + end + end + + # Pop an item off the subscription queue so we can work on it. + def pop(&block) + queue.pop do |message| + block.call message, subscription + end + end + + # Kill the queue in n seconds. + def kill(ttl=0, &block) + if ttl.zero? + subscription.unsubscribe &block + else + @timer = EM::Timer.new(ttl){ kill 0 } + end + end + + def live + @timer.cancel if @timer + end + + private + def queue + @queue ||= EM::Queue.new end end end \ No newline at end of file