Sha256: 9f9b200931f372feec9798aecc88e799ddc52c18767001930579f870b802e3da
Contents?: true
Size: 1.85 KB
Versions: 4
Compression:
Stored size: 1.85 KB
Contents
require 'securerandom' module Firehose class Subscription TTL = 15000 # Time to live for the queue on the server after the subscription is canceled. This # is mostly for flakey connections where the client may reconnect after *ttl* and continue # receiving messages. attr_accessor :ttl # Globally unique subscription id attr_reader :subscriber_id def initialize(subscriber_id=nil) @subscriber_id = subscriber_id || self.class.subscriber_id end def subscribe(path, &block) queue_name = "#{subscriber_id}@#{path}" channel = AMQP::Channel.new(Firehose.amqp.connection).prefetch(1) exchange = AMQP::Exchange.new(channel, :fanout, path, :auto_delete => true) queue = AMQP::Queue.new(channel, queue_name, :arguments => {'x-expires' => ttl}) queue.bind(exchange) # When we get a message, we want to remove the consumer from the queue so that the x-expires # ttl starts ticking down. On the reconnect, the consumer connects to the queue and resets the # timer on x-expires... in theory at least. @consumer = AMQP::Consumer.new(channel, queue, subscriber_id) @consumer.on_delivery do |metadata, payload| p [:get, subscriber_id, @consumer.consumer_tag, path, payload] block.call(payload) # The ack needs to go after the block is called. This makes sure that all processing # happens downstream before we remove it from the queue entirely. metadata.ack end.consume end def unsubscribe @consumer.cancel if @consumer end # The time that a queue should live *after* the client unsubscribes. This is useful for # flakey network connections, like HTTP Long Polling or even broken web sockets. def ttl @ttl ||= TTL end protected def self.subscriber_id SecureRandom.uuid end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
firehose-0.0.6 | lib/firehose/subscription.rb |
firehose-0.0.5 | lib/firehose/subscription.rb |
firehose-0.0.4 | lib/firehose/subscription.rb |
firehose-0.0.3 | lib/firehose/subscription.rb |