Sha256: 05fada0c2f0dcc3d23aa13be2742f4f14a38d773711aa39ad215b3968529b2b6

Contents?: true

Size: 1.59 KB

Versions: 1

Compression:

Stored size: 1.59 KB

Contents

require 'faye/websocket'
require 'broadcaster'
require 'class_config'

require_relative 'broadcaster/safe_event_machine'
require_relative 'broadcaster/version'

module Rasti
  module Web
    class Broadcaster

      extend ClassConfig

      attr_config :id,             'rasti.web.broadcaster'
      attr_config :redis_client,   Redic
      attr_config :redis_settings, 'redis://localhost:6379'
      attr_config :logger,         Logger.new(STDOUT)

      @mutex = Mutex.new

      class << self

        extend Forwardable

        def_delegators :broadcaster, :subscribe, 
                                     :unsubscribe, 
                                     :publish

        private

        def broadcaster
          @mutex.synchronize do
            @broadcaster ||= ::Broadcaster.new configuration
          end
        end

      end

      def initialize(app, headers={})
        @app = app
        @headers = headers
      end

      def call(env)
        if Faye::EventSource.eventsource? env
          event_source = Faye::EventSource.new env, headers: @headers
          channel = env['PATH_INFO'][1..-1]

          subscription_id = self.class.subscribe channel do |message|
            event_source.send message[:data], event: message[:event], 
                                              id:    message[:id]
          end

          event_source.on :close do
            self.class.unsubscribe subscription_id
            event_source = nil
          end

          event_source.rack_response
        else
          app.call env
        end
      end

      private

      attr_reader :app

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rasti-web-broadcaster-1.1.0 lib/rasti/web/broadcaster.rb