Sha256: 5669fb74d277df5bc16b1a751b482994a11073dc80c874cac748cb7e32b3d91d

Contents?: true

Size: 1.87 KB

Versions: 3

Compression:

Stored size: 1.87 KB

Contents

module Faye
  class RackStream

    include EventMachine::Deferrable

    module Reader
      attr_accessor :stream

      def receive_data(data)
        stream.receive(data)
      end

      def unbind
        stream.fail
      end
    end

    def initialize(socket)
      @socket_object = socket
      @connection    = socket.env['em.connection']
      @stream_send   = socket.env['stream.send']

      hijack_rack_socket

      @connection.socket_stream = self if @connection.respond_to?(:socket_stream)
    end

    def hijack_rack_socket
      return unless @socket_object.env['rack.hijack']

      @socket_object.env['rack.hijack'].call
      @rack_hijack_io = @socket_object.env['rack.hijack_io']
      queue = Queue.new

      EventMachine.schedule do
        begin
          EventMachine.attach(@rack_hijack_io, Reader) do |reader|
            reader.stream = self
            if @rack_hijack_io
              @rack_hijack_io_reader = reader
            else
              reader.close_connection_after_writing
            end
          end
        ensure
          queue.push(nil)
        end
      end

      queue.pop if EventMachine.reactor_running?
    end

    def clean_rack_hijack
      return unless @rack_hijack_io
      @rack_hijack_io_reader.close_connection_after_writing
      @rack_hijack_io = @rack_hijack_io_reader = nil
    end

    def close_connection
      clean_rack_hijack
      @connection.close_connection if @connection
    end

    def close_connection_after_writing
      clean_rack_hijack
      @connection.close_connection_after_writing if @connection
    end

    def each(&callback)
      @stream_send ||= callback
    end

    def fail
    end

    def receive(data)
    end

    def write(data)
      return @rack_hijack_io.write(data) if @rack_hijack_io
      return @stream_send.call(data) if @stream_send
    rescue => e
      fail if EOFError === e
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
faye-websocket-0.10.3 lib/faye/rack_stream.rb
faye-websocket-0.10.2 lib/faye/rack_stream.rb
faye-websocket-0.10.1 lib/faye/rack_stream.rb