lib/rasti/web/broadcaster.rb in rasti-web-broadcaster-1.1.2 vs lib/rasti/web/broadcaster.rb in rasti-web-broadcaster-2.0.0

- old
+ new

@@ -2,31 +2,35 @@ require 'broadcaster' require 'class_config' require_relative 'broadcaster/safe_event_machine' require_relative 'broadcaster/safe_rack_lint' +require_relative 'broadcaster/timer' require_relative 'broadcaster/version' module Rasti module Web class Broadcaster + KEEP_ALIVE_EVENT = 'keepAlive' + extend ClassConfig - attr_config :id, 'rasti.web.broadcaster' - attr_config :redis_client, Redic - attr_config :redis_settings, 'redis://localhost:6379' - attr_config :logger, Logger.new(STDOUT) + attr_config :id, 'rasti.web.broadcaster' + attr_config :redis_client, Redic + attr_config :redis_settings, 'redis://localhost:6379' + attr_config :logger, Logger.new(STDOUT) + attr_config :keep_alive_interval @mutex = Mutex.new class << self extend Forwardable - def_delegators :broadcaster, :subscribe, - :unsubscribe, + def_delegators :broadcaster, :subscribe, + :unsubscribe, :publish private def broadcaster @@ -38,24 +42,24 @@ end def initialize(app, headers={}) @app = app @headers = headers + @mutex = Mutex.new + @subscriptions = {} + + start_sending_keep_alive_messages end def call(env) if Faye::EventSource.eventsource? env - event_source = Faye::EventSource.new env, headers: @headers - channel = env['PATH_INFO'][1..-1] + event_source = Faye::EventSource.new env, headers: headers - subscription_id = self.class.subscribe channel do |message| - event_source.send message[:data], event: message[:event], - id: message[:id] - end + subscription_id = subscribe channel_from(env), event_source event_source.on :close do - self.class.unsubscribe subscription_id + unsubscribe subscription_id event_source = nil end event_source.rack_response else @@ -63,10 +67,44 @@ end end private - attr_reader :app + attr_reader :app, :headers, :mutex, :subscriptions + + def subscribe(channel, event_source) + subscription_id = self.class.subscribe channel do |message| + send_message(event_source, **message) + end + + mutex.synchronize { subscriptions[subscription_id] = event_source } + + subscription_id + end + + def unsubscribe(subscription_id) + self.class.unsubscribe subscription_id + mutex.synchronize { subscriptions.delete subscription_id } + end + + def send_message(event_source, data:, event: nil, id: nil) + event_source.send data, event: event, id: id + end + + def start_sending_keep_alive_messages + if self.class.keep_alive_interval + Timer.every self.class.keep_alive_interval do + subscriptions.each do |subscription_id, event_source| + self.class.logger.debug(self.class) { "Sending keep alive to #{subscription_id}" } + send_message event_source, data: '', event: KEEP_ALIVE_EVENT + end + end + end + end + + def channel_from(env) + env['PATH_INFO'][1..-1] + end end end end