lib/firehose/subscription.rb in firehose-0.0.16 vs lib/firehose/subscription.rb in firehose-0.1.0
- old
+ new
@@ -1,65 +1,75 @@
module Firehose
class Subscription
- # Default TTL for how long a subscription should live on the server when the
- # consumer disconnects.
- # TODO should the Consumer handle TTL?
- TTL = 15000
+ # consumer_id and channel for the subscription.
+ attr_reader :consumer_id
- # Time to live for the amqp_queue on the server after the subscription is canceled. This
- # is mostly for flakey connections where the client may reconnect after *ttl* and continue
- # receiving messages.
- attr_accessor :ttl
-
- # Consumer and channel for the subscription.
- attr_reader :consumer
-
# Channel that we'll use for the pub-sub activity. This probably maps to an URL
attr_reader :channel
- def initialize(consumer, channel)
- @consumer, @channel = consumer, channel
+ def initialize(consumer_id, channel)
+ @consumer_id, @channel = consumer_id, channel
end
- # TODO - Move the channel to an initializer so that we can force on AMQP subscription per one
- # Firehose subscription. As it stands now, you could fire off multple subscriptions to diff amqp_channels
+ # Subscribe to messages on the backend to fill up the subscription queue. consumer_ids of the messages
+ # will queue up units of "work" to process data from the subscription.
def subscribe(&block)
- amqp_queue_name = "#{consumer.guid}@#{channel}"
- amqp_channel = AMQP::Channel.new(Firehose.amqp.connection).prefetch(1)
- amqp_exchange = AMQP::Exchange.new(amqp_channel, :fanout, channel, :auto_delete => true)
- amqp_queue = AMQP::Queue.new(amqp_channel, amqp_queue_name, :arguments => {'x-expires' => ttl})
- amqp_queue.bind(amqp_exchange)
-
- # When we get a message, we want to remove the consumer from the amqp_queue so that the x-expires
- # ttl starts ticking down. On the reconnect, the consumer connects to the amqp_queue and resets the
- # timer on x-expires... in theory at least.
- @amqp_consumer = AMQP::Consumer.new(amqp_channel, amqp_queue, consumer.guid)
- @amqp_consumer.on_delivery do |metadata, message|
- Firehose.logger.debug "AMQP delivering `#{message}` to `#{consumer.guid}@#{channel}`"
- block.call(message, self)
- # The ack needs to go after the block is called. This makes sure that all processing
- # happens downstream before we remove it from the amqp_queue entirely.
- metadata.ack
- end.consume
- Firehose.logger.debug "AMQP subscribed to `#{consumer.guid}@#{channel}`"
+ redis.subscribe(channel)
+ redis.on(:message) do |channel, message|
+ Firehose.logger.debug "Redis recieved `#{message}` to `#{consumer_id}@#{channel}`"
+ block.call message, self
+ end
+ Firehose.logger.debug "Redis subscribed to `#{consumer_id}@#{channel}`"
self # Return the subscription for chaining.
end
- def unsubscribe
- Firehose.logger.debug "AMQP unsubscribed"
- @amqp_consumer.cancel if @amqp_consumer
- @unsubscribe_callback.call self if @unsubscribe_callback
+ def unsubscribe(&block)
+ Firehose.logger.debug "Redis unsubscribed from `#{consumer_id}@#{channel}`"
+ redis.close
+ block.call(self) if block
+ self
end
- # Callback when consumer unsubscribes from subscription. The consumer uses this to remove
- # subscriptions from itself when an unsubscribe happens.
- def on_unsubscribe(&block)
- @unsubscribe_callback = block
+ private
+ def redis
+ @redis ||= EM::Hiredis.connect
end
+ end
- # The time that a amqp_queue should live *after* the client unsubscribes. This is useful for
- # flakey network connections, like HTTP Long Polling or even broken web sockets.
- def ttl
- @ttl ||= TTL
+ # Queue subscription messages so that we can remember and/or operate on them
+ class Subscription::Queue
+ attr_reader :subscription, :channel
+
+ def initialize(consumer_id, channel)
+ @subscription = Subscription.new(consumer_id, channel)
+ # Start the subscription and start dropping mesasge onto the queue
+ subscription.subscribe do |message|
+ queue.push message
+ end
+ end
+
+ # Pop an item off the subscription queue so we can work on it.
+ def pop(&block)
+ queue.pop do |message|
+ block.call message, subscription
+ end
+ end
+
+ # Kill the queue in n seconds.
+ def kill(ttl=0, &block)
+ if ttl.zero?
+ subscription.unsubscribe &block
+ else
+ @timer = EM::Timer.new(ttl){ kill 0 }
+ end
+ end
+
+ def live
+ @timer.cancel if @timer
+ end
+
+ private
+ def queue
+ @queue ||= EM::Queue.new
end
end
end
\ No newline at end of file