Sha256: 6f613d26787821b43d4b98feba76e3f0b939897495ccf0ae24dfed9f24307dbb

Contents?: true

Size: 1.58 KB

Versions: 9

Compression:

Stored size: 1.58 KB

Contents

module Pubsubstub
  class Subscription
    include Logging

    attr_reader :channels, :connection, :queue, :id

    def initialize(channels, connection)
      @id = Random.rand(2 ** 64)
      @connection = connection
      @channels = channels
      @queue = Queue.new
    end

    def stream(last_event_id)
      info { "Connecting client ##{id} (#{channels.map(&:name).join(', ')})" }
      subscribe
      fetch_scrollback(last_event_id)
      while event = queue.pop
        debug { "Sending event ##{event.id} to client ##{id}"}
        connection << event.to_message
      end
    ensure
      info { "Disconnecting client ##{id}" }
      unsubscribe
    end

    def push(event)
      queue.push(event)
    end

    private

    def subscribe
      channels.each { |c| Pubsubstub.subscriber.add_event_listener(c.name, callback) }
    end

    def unsubscribe
      channels.each { |c| Pubsubstub.subscriber.remove_event_listener(c.name, callback) }
    end

    # This method is not ideal as it doesn't guarantee order in case of multi-channel subscription
    def fetch_scrollback(last_event_id)
      event_sent = false
      if last_event_id
        channels.each do |channel|
          channel.scrollback(since: last_event_id).each do |event|
            event_sent = true
            queue.push(event)
          end
        end
      end

      queue.push(Pubsubstub.heartbeat_event) unless event_sent
    end

    # We use store the callback so that the object_id stays the same.
    # Otherwise we wouldn't be able to unsubscribe
    def callback
      @callback ||= method(:push)
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
pubsubstub-0.3.0 lib/pubsubstub/subscription.rb
pubsubstub-0.2.2 lib/pubsubstub/subscription.rb
pubsubstub-0.2.1 lib/pubsubstub/subscription.rb
pubsubstub-0.2.0 lib/pubsubstub/subscription.rb
pubsubstub-0.1.3 lib/pubsubstub/subscription.rb
pubsubstub-0.1.2 lib/pubsubstub/subscription.rb
pubsubstub-0.1.1 lib/pubsubstub/subscription.rb
pubsubstub-0.1.0 lib/pubsubstub/subscription.rb
pubsubstub-0.1.0.beta1 lib/pubsubstub/subscription.rb