Sha256: 9f9b200931f372feec9798aecc88e799ddc52c18767001930579f870b802e3da

Contents?: true

Size: 1.85 KB

Versions: 4

Compression:

Stored size: 1.85 KB

Contents

require 'securerandom'

module Firehose
  class Subscription
    TTL = 15000

    # Time to live for the queue on the server after the subscription is canceled. This
    # is mostly for flakey connections where the client may reconnect after *ttl* and continue
    # receiving messages.
    attr_accessor :ttl

    # Globally unique subscription id
    attr_reader :subscriber_id

    def initialize(subscriber_id=nil)
      @subscriber_id = subscriber_id || self.class.subscriber_id
    end

    def subscribe(path, &block)
      queue_name  = "#{subscriber_id}@#{path}"
      channel     = AMQP::Channel.new(Firehose.amqp.connection).prefetch(1)
      exchange    = AMQP::Exchange.new(channel, :fanout, path, :auto_delete => true)
      queue       = AMQP::Queue.new(channel, queue_name, :arguments => {'x-expires' => ttl})
      queue.bind(exchange)

      # When we get a message, we want to remove the consumer from the queue so that the x-expires
      # ttl starts ticking down. On the reconnect, the consumer connects to the queue and resets the
      # timer on x-expires... in theory at least.
      @consumer = AMQP::Consumer.new(channel, queue, subscriber_id)
      @consumer.on_delivery do |metadata, payload|
        p [:get, subscriber_id, @consumer.consumer_tag, path, payload]
        block.call(payload)
        # The ack needs to go after the block is called. This makes sure that all processing
        # happens downstream before we remove it from the queue entirely.
        metadata.ack
      end.consume
    end

    def unsubscribe
      @consumer.cancel if @consumer
    end

    # The time that a queue should live *after* the client unsubscribes. This is useful for
    # flakey network connections, like HTTP Long Polling or even broken web sockets.
    def ttl
      @ttl ||= TTL
    end

  protected
    def self.subscriber_id
      SecureRandom.uuid
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
firehose-0.0.6 lib/firehose/subscription.rb
firehose-0.0.5 lib/firehose/subscription.rb
firehose-0.0.4 lib/firehose/subscription.rb
firehose-0.0.3 lib/firehose/subscription.rb