lib/pubsubstub/stream_action.rb in pubsubstub-0.0.9 vs lib/pubsubstub/stream_action.rb in pubsubstub-0.0.10
- old
+ new
@@ -1,7 +1,9 @@
module Pubsubstub
class StreamAction < Pubsubstub::Action
+ RECONNECT_TIMEOUT = 10_000
+
def initialize(*)
super
start_heartbeat
end
@@ -10,33 +12,74 @@
headers({
'Cache-Control' => 'no-cache',
'X-Accel-Buffering' => 'no',
'Connection' => 'keep-alive',
})
+
+ if EventMachine.reactor_running?
+ subscribe_connection
+ else
+ return_scrollback
+ end
+ end
+
+ private
+
+ def return_scrollback
+ buffer = ''
+ ensure_connection_has_event(buffer)
+
+ with_each_channel do |channel|
+ channel.scrollback(buffer, last_event_id)
+ end
+
+ buffer
+ end
+
+ def last_event_id
+ request.env['HTTP_LAST_EVENT_ID']
+ end
+
+ def subscribe_connection
stream(:keep_open) do |connection|
@connections << connection
- channels = params[:channels] || [:default]
- channels.each do |channel_name|
- channel(channel_name).subscribe(connection, last_event_id: request.env['HTTP_LAST_EVENT_ID'])
+ ensure_connection_has_event(connection)
+ with_each_channel do |channel|
+ channel.subscribe(connection, last_event_id: last_event_id)
end
connection.callback do
@connections.delete(connection)
- channels.each do |channel_name|
- channel(channel_name).unsubscribe(connection)
+ with_each_channel do |channel|
+ channel.unsubscribe(connection)
end
end
end
end
- private
+ def ensure_connection_has_event(connection)
+ return if last_event_id
+ connection << heartbeat_event.to_message
+ end
+
def start_heartbeat
@heartbeat = Thread.new do
loop do
sleep Pubsubstub.heartbeat_frequency
- event = Event.new('ping', name: 'heartbeat').to_message
+ event = heartbeat_frequency.to_message
@connections.each { |connection| connection << event }
end
end
+ end
+
+ def with_each_channel(&block)
+ channels = params[:channels] || [:default]
+ channels.each do |channel_name|
+ yield channel(channel_name)
+ end
+ end
+
+ def heartbeat_event
+ Event.new('ping', name: 'heartbeat', retry_after: RECONNECT_TIMEOUT)
end
end
end