Sha256: db83e841726350768669d3dd08f1e44709aac030ba72525a63d214f723b9fb9d

Contents?: true

Size: 1.64 KB

Versions: 2

Compression:

Stored size: 1.64 KB

Contents

require 'faye/websocket'
require 'broadcaster'
require 'class_config'

require_relative 'broadcaster/safe_event_machine'
require_relative 'broadcaster/safe_rack_lint'
require_relative 'broadcaster/version'

module Rasti
  module Web
    class Broadcaster

      extend ClassConfig

      attr_config :id,             'rasti.web.broadcaster'
      attr_config :redis_client,   Redic
      attr_config :redis_settings, 'redis://localhost:6379'
      attr_config :logger,         Logger.new(STDOUT)

      @mutex = Mutex.new

      class << self

        extend Forwardable

        def_delegators :broadcaster, :subscribe, 
                                     :unsubscribe, 
                                     :publish

        private

        def broadcaster
          @mutex.synchronize do
            @broadcaster ||= ::Broadcaster.new configuration
          end
        end

      end

      def initialize(app, headers={})
        @app = app
        @headers = headers
      end

      def call(env)
        if Faye::EventSource.eventsource? env
          event_source = Faye::EventSource.new env, headers: @headers
          channel = env['PATH_INFO'][1..-1]

          subscription_id = self.class.subscribe channel do |message|
            event_source.send message[:data], event: message[:event], 
                                              id:    message[:id]
          end

          event_source.on :close do
            self.class.unsubscribe subscription_id
            event_source = nil
          end

          event_source.rack_response
        else
          app.call env
        end
      end

      private

      attr_reader :app

    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rasti-web-broadcaster-1.1.2 lib/rasti/web/broadcaster.rb
rasti-web-broadcaster-1.1.1 lib/rasti/web/broadcaster.rb