Sha256: 6f613d26787821b43d4b98feba76e3f0b939897495ccf0ae24dfed9f24307dbb
Contents?: true
Size: 1.58 KB
Versions: 9
Compression:
Stored size: 1.58 KB
Contents
module Pubsubstub class Subscription include Logging attr_reader :channels, :connection, :queue, :id def initialize(channels, connection) @id = Random.rand(2 ** 64) @connection = connection @channels = channels @queue = Queue.new end def stream(last_event_id) info { "Connecting client ##{id} (#{channels.map(&:name).join(', ')})" } subscribe fetch_scrollback(last_event_id) while event = queue.pop debug { "Sending event ##{event.id} to client ##{id}"} connection << event.to_message end ensure info { "Disconnecting client ##{id}" } unsubscribe end def push(event) queue.push(event) end private def subscribe channels.each { |c| Pubsubstub.subscriber.add_event_listener(c.name, callback) } end def unsubscribe channels.each { |c| Pubsubstub.subscriber.remove_event_listener(c.name, callback) } end # This method is not ideal as it doesn't guarantee order in case of multi-channel subscription def fetch_scrollback(last_event_id) event_sent = false if last_event_id channels.each do |channel| channel.scrollback(since: last_event_id).each do |event| event_sent = true queue.push(event) end end end queue.push(Pubsubstub.heartbeat_event) unless event_sent end # We use store the callback so that the object_id stays the same. # Otherwise we wouldn't be able to unsubscribe def callback @callback ||= method(:push) end end end
Version data entries
9 entries across 9 versions & 1 rubygems