examples/server_sent_events.rb in reel-0.4.0 vs examples/server_sent_events.rb in reel-0.5.0.pre

- old
+ new

@@ -1,69 +1,73 @@ #!/usr/bin/env ruby +# See: http://www.w3.org/TR/eventsource/ # Run with: bundle exec examples/server_sent_events.rb # Test with: curl -vNH 'Accept: text/event-stream' -H 'Last-Event-ID: 1' -H 'Cache-Control: no-cache' http://localhost:63310 require 'bundler/setup' require 'time' require 'reel' -class ServerSentEvents < Reel::Server + +class ServerSentEvents < Reel::Server::HTTP include Celluloid::Logger def initialize(ip = '127.0.0.1', port = 63310) @connections = [] @history = [] - @lastMessageId = 0 + @lastEventId = 0 async.ping - async.ring + async.ring #not needed for Production, only to have some events here. super(ip, port, &method(:on_connection)) end + #broadcasts events to all clients def broadcast(event, data) #only keep the last 5000 Events if @history.size >= 6000 @history.slice!(0, @history.size - 1000) end - @lastMessageId += 1 - @history << {id: @lastMessageId, event: event, data: data} + @lastEventId += 1 + @history << {id: @lastEventId, event: event, data: data} info "Sending Event: #{event} Data: #{data} to #{@connections.count} Clients" @connections.each do |socket| - async.send_sse(socket, data, event, @lastMessageId) + async.send_sse(socket, data, event, @lastEventId) end true end private - #event and id are optional + #event and id are optional, Eventsource only needs data def send_sse(socket, data, event = nil, id = nil) begin socket.id id if id socket.event event if event socket.data data - rescue IOError, Errno::ECONNRESET, Errno::EPIPE - @connections.delete(socket) + rescue Reel::SocketError, NoMethodError + @connections.delete(socket) if @connections.include?(socket) end end + #Lines that start with a Colon are Comments and will be ignored def send_ping @connections.each do |socket| begin - #Lines that start with a Colon are Comments and will be ignored socket << ":\n" - rescue IOError, Errno::ECONNRESET, Errno::EPIPE + rescue Reel::SocketError @connections.delete(socket) end end end + #apache 2.2 closes connections after five seconds when nothing is send, see this as a poor mans Keep-Alive def ping - #apache 2.2 closes connections after five seconds when nothing is send every(5) do send_ping end end + #only used to have some events here, not needed for Production. def ring every(2) do broadcast(:time, Time.now.httpdate) end end @@ -75,23 +79,23 @@ if key && value key, value = CGI.unescape(key), CGI.unescape(value) query[key] = value end end - body = Reel::EventStream.new do |socket| + #see https://github.com/celluloid/reel/blob/master/lib/reel/stream.rb#L35 + eventStream = Reel::EventStream.new do |socket| @connections << socket socket.retry 5000 - #after a Connection reset resend newer Messages to the Client + #after a Connection reset resend newer Messages to the Client, query['lastEventId'] is needed for https://github.com/Yaffle/EventSource if @history.count > 0 && id = (request.headers['Last-Event-ID'] || query['lastEventId']) begin - id = Integer(id) - if history = @history.select {|h| h[:id] >= id}.map {|a| "id: %d\nevent: %s\ndata: %s" % [a[:id], a[:event], a[:data]]}.join("\n\n") + if history = @history.select {|h| h[:id] >= Integer(id)}.map {|a| "id: %d\nevent: %s\ndata: %s" % [a[:id], a[:event], a[:data]]}.join("\n\n") socket << "%s\n\n" % [history] else socket << "id\n\n" end - rescue ArgumentError, IOError, Errno::ECONNRESET, Errno::EPIPE + rescue ArgumentError, Reel::SocketError @connections.delete(socket) request.close end else socket << "id\n\n" @@ -100,10 +104,10 @@ #X-Accel-Buffering is nginx(?) specific. Setting this to "no" will allow unbuffered responses suitable for Comet and HTTP streaming applications request.respond Reel::StreamResponse.new(:ok, { 'Content-Type' => 'text/event-stream; charset=utf-8', 'Cache-Control' => 'no-cache', 'X-Accel-Buffering' => 'no', - 'Access-Control-Allow-Origin' => '*'}, body) + 'Access-Control-Allow-Origin' => '*'}, eventStream) end def on_connection(connection) connection.each_request do |request| handle_request(request)