lib/reel/websocket.rb in reel-0.5.0 vs lib/reel/websocket.rb in reel-0.6.0.pre1
- old
+ new
@@ -1,53 +1,44 @@
require 'forwardable'
-require 'websocket_parser'
+require 'websocket/driver'
+require 'rack'
module Reel
class WebSocket
extend Forwardable
include ConnectionMixin
include RequestMixin
attr_reader :socket
def_delegators :@socket, :addr, :peeraddr
- def initialize(info, socket)
+ def initialize(info, connection)
+ driver_env = DriverEnvironment.new(info, connection.socket)
+
+ @socket = connection.hijack_socket
@request_info = info
- @socket = socket
- handshake = ::WebSocket::ClientHandshake.new(:get, url, headers)
-
- if handshake.valid?
- response = handshake.accept_response
- response.render(socket)
- else
- error = handshake.errors.first
-
- response = Response.new(400)
- response.reason = handshake.errors.first
- response.render(@socket)
-
- raise HandshakeError, "error during handshake: #{error}"
+ @driver = ::WebSocket::Driver.rack(driver_env)
+ @driver.on(:close) do
+ @socket.close
end
- @parser = ::WebSocket::Parser.new
+ @message_stream = MessageStream.new(@socket, @driver)
+ @driver.start
+ rescue EOFError
+ close
+ end
- @parser.on_close do |status, reason|
- # According to the spec the server must respond with another
- # close message before closing the connection
- @socket << ::WebSocket::Message.close.to_data
- close
+ def on_message(&block)
+ @driver.on :message do |message|
+ block.(message.data)
end
-
- @parser.on_ping do |payload|
- @socket << ::WebSocket::Message.pong(payload).to_data
- end
end
- [:next_message, :next_messages, :on_message, :on_error, :on_close, :on_ping, :on_pong].each do |meth|
- define_method meth do |&proc|
- @parser.send __method__, &proc
+ [:error, :close, :ping, :pong].each do |meth|
+ define_method "on_#{meth}" do |&proc|
+ @driver.send(:on, meth, &proc)
end
end
def read_every(n, unit = :s)
cancel_timer! # only one timer allowed per stream
@@ -63,43 +54,85 @@
end
alias read_interval read_every
alias read_frequency read_every
def read
- @parser.append @socket.readpartial(Connection::BUFFER_SIZE) until msg = @parser.next_message
- msg
- rescue
- cancel_timer!
- raise
+ @message_stream.read
end
- def body
- nil
+ def closed?
+ @socket.closed?
end
def write(msg)
- @socket << ::WebSocket::Message.new(msg).to_data
- msg
+ if msg.is_a? String
+ @driver.text(msg)
+ elsif msg.is_a? Array
+ @driver.binary(msg)
+ else
+ raise "Can only send byte array or string over driver."
+ end
rescue IOError, Errno::ECONNRESET, Errno::EPIPE
cancel_timer!
raise SocketError, "error writing to socket"
rescue
cancel_timer!
raise
end
alias_method :<<, :write
- def closed?
- @socket.closed?
- end
-
def close
- cancel_timer!
- @socket.close unless closed?
+ @driver.close
+ @socket.close
end
def cancel_timer!
@timer && @timer.cancel
end
+ private
+
+ class DriverEnvironment
+ extend Forwardable
+
+ attr_reader :env, :url, :socket
+
+ def_delegators :socket, :write
+
+ def initialize(info, socket)
+ @url = info.url
+
+ env_hash = Hash[info.headers.map { |key, value| ['HTTP_' + key.upcase.gsub('-','_'),value ] }]
+
+ env_hash.merge!({
+ :method => info.method,
+ :input => info.body.to_s,
+ 'REMOTE_ADDR' => info.remote_addr
+ })
+
+ @env = ::Rack::MockRequest.env_for(@url, env_hash)
+
+ @socket = socket
+ end
+ end
+
+ class MessageStream
+ def initialize(socket, driver)
+ @socket = socket
+ @driver = driver
+ @message_buffer = []
+
+ @driver.on :message do |message|
+ @message_buffer.push(message.data)
+ end
+ end
+
+ def read
+ while @message_buffer.empty?
+ buffer = @socket.readpartial(Connection::BUFFER_SIZE)
+ @driver.parse(buffer)
+ end
+ @message_buffer.shift
+ end
+ end
end
end