lib/pubsubstub/stream_action.rb in pubsubstub-0.0.9 vs lib/pubsubstub/stream_action.rb in pubsubstub-0.0.10

- old
+ new

@@ -1,7 +1,9 @@ module Pubsubstub class StreamAction < Pubsubstub::Action + RECONNECT_TIMEOUT = 10_000 + def initialize(*) super start_heartbeat end @@ -10,33 +12,74 @@ headers({ 'Cache-Control' => 'no-cache', 'X-Accel-Buffering' => 'no', 'Connection' => 'keep-alive', }) + + if EventMachine.reactor_running? + subscribe_connection + else + return_scrollback + end + end + + private + + def return_scrollback + buffer = '' + ensure_connection_has_event(buffer) + + with_each_channel do |channel| + channel.scrollback(buffer, last_event_id) + end + + buffer + end + + def last_event_id + request.env['HTTP_LAST_EVENT_ID'] + end + + def subscribe_connection stream(:keep_open) do |connection| @connections << connection - channels = params[:channels] || [:default] - channels.each do |channel_name| - channel(channel_name).subscribe(connection, last_event_id: request.env['HTTP_LAST_EVENT_ID']) + ensure_connection_has_event(connection) + with_each_channel do |channel| + channel.subscribe(connection, last_event_id: last_event_id) end connection.callback do @connections.delete(connection) - channels.each do |channel_name| - channel(channel_name).unsubscribe(connection) + with_each_channel do |channel| + channel.unsubscribe(connection) end end end end - private + def ensure_connection_has_event(connection) + return if last_event_id + connection << heartbeat_event.to_message + end + def start_heartbeat @heartbeat = Thread.new do loop do sleep Pubsubstub.heartbeat_frequency - event = Event.new('ping', name: 'heartbeat').to_message + event = heartbeat_frequency.to_message @connections.each { |connection| connection << event } end end + end + + def with_each_channel(&block) + channels = params[:channels] || [:default] + channels.each do |channel_name| + yield channel(channel_name) + end + end + + def heartbeat_event + Event.new('ping', name: 'heartbeat', retry_after: RECONNECT_TIMEOUT) end end end