lib/rasti/web/broadcaster.rb in rasti-web-broadcaster-1.1.2 vs lib/rasti/web/broadcaster.rb in rasti-web-broadcaster-2.0.0
- old
+ new
@@ -2,31 +2,35 @@
require 'broadcaster'
require 'class_config'
require_relative 'broadcaster/safe_event_machine'
require_relative 'broadcaster/safe_rack_lint'
+require_relative 'broadcaster/timer'
require_relative 'broadcaster/version'
module Rasti
module Web
class Broadcaster
+ KEEP_ALIVE_EVENT = 'keepAlive'
+
extend ClassConfig
- attr_config :id, 'rasti.web.broadcaster'
- attr_config :redis_client, Redic
- attr_config :redis_settings, 'redis://localhost:6379'
- attr_config :logger, Logger.new(STDOUT)
+ attr_config :id, 'rasti.web.broadcaster'
+ attr_config :redis_client, Redic
+ attr_config :redis_settings, 'redis://localhost:6379'
+ attr_config :logger, Logger.new(STDOUT)
+ attr_config :keep_alive_interval
@mutex = Mutex.new
class << self
extend Forwardable
- def_delegators :broadcaster, :subscribe,
- :unsubscribe,
+ def_delegators :broadcaster, :subscribe,
+ :unsubscribe,
:publish
private
def broadcaster
@@ -38,24 +42,24 @@
end
def initialize(app, headers={})
@app = app
@headers = headers
+ @mutex = Mutex.new
+ @subscriptions = {}
+
+ start_sending_keep_alive_messages
end
def call(env)
if Faye::EventSource.eventsource? env
- event_source = Faye::EventSource.new env, headers: @headers
- channel = env['PATH_INFO'][1..-1]
+ event_source = Faye::EventSource.new env, headers: headers
- subscription_id = self.class.subscribe channel do |message|
- event_source.send message[:data], event: message[:event],
- id: message[:id]
- end
+ subscription_id = subscribe channel_from(env), event_source
event_source.on :close do
- self.class.unsubscribe subscription_id
+ unsubscribe subscription_id
event_source = nil
end
event_source.rack_response
else
@@ -63,10 +67,44 @@
end
end
private
- attr_reader :app
+ attr_reader :app, :headers, :mutex, :subscriptions
+
+ def subscribe(channel, event_source)
+ subscription_id = self.class.subscribe channel do |message|
+ send_message(event_source, **message)
+ end
+
+ mutex.synchronize { subscriptions[subscription_id] = event_source }
+
+ subscription_id
+ end
+
+ def unsubscribe(subscription_id)
+ self.class.unsubscribe subscription_id
+ mutex.synchronize { subscriptions.delete subscription_id }
+ end
+
+ def send_message(event_source, data:, event: nil, id: nil)
+ event_source.send data, event: event, id: id
+ end
+
+ def start_sending_keep_alive_messages
+ if self.class.keep_alive_interval
+ Timer.every self.class.keep_alive_interval do
+ subscriptions.each do |subscription_id, event_source|
+ self.class.logger.debug(self.class) { "Sending keep alive to #{subscription_id}" }
+ send_message event_source, data: '', event: KEEP_ALIVE_EVENT
+ end
+ end
+ end
+ end
+
+ def channel_from(env)
+ env['PATH_INFO'][1..-1]
+ end
end
end
end