Sha256: d3330d4e8a5b21e19ba3def32b672459043f273bcee1d87ae336ebe12aee60dc
Contents?: true
Size: 1.84 KB
Versions: 4
Compression:
Stored size: 1.84 KB
Contents
module Firehose class Subscription TTL = 15000 # Time to live for the queue on the server after the subscription is canceled. This # is mostly for flakey connections where the client may reconnect after *ttl* and continue # receiving messages. attr_accessor :ttl # Globally unique subscription id attr_reader :subscriber_id def initialize(subscriber_id=nil) @subscriber_id = subscriber_id || self.class.subscriber_id end def subscribe(path, &block) queue_name = "#{subscriber_id}@#{path}" channel = AMQP::Channel.new(Firehose.amqp.connection).prefetch(1) exchange = AMQP::Exchange.new(channel, :fanout, path, :auto_delete => true) queue = AMQP::Queue.new(channel, queue_name, :arguments => {'x-expires' => ttl}) queue.bind(exchange) # When we get a message, we want to remove the consumer from the queue so that the x-expires # ttl starts ticking down. On the reconnect, the consumer connects to the queue and resets the # timer on x-expires... in theory at least. @consumer = AMQP::Consumer.new(channel, queue, subscriber_id) @consumer.on_delivery do |metadata, payload| p [:get, subscriber_id, @consumer.consumer_tag, path, payload] block.call(payload) # The ack needs to go after the block is called. This makes sure that all processing # happens downstream before we remove it from the queue entirely. metadata.ack end.consume end def unsubscribe @consumer.cancel if @consumer end # The time that a queue should live *after* the client unsubscribes. This is useful for # flakey network connections, like HTTP Long Polling or even broken web sockets. def ttl @ttl ||= TTL end protected def self.subscriber_id rand(999_999_999_999).to_s end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
firehose-0.0.10 | lib/firehose/subscription.rb |
firehose-0.0.9 | lib/firehose/subscription.rb |
firehose-0.0.8 | lib/firehose/subscription.rb |
firehose-0.0.7 | lib/firehose/subscription.rb |