lib/reel/websocket.rb in reel-0.5.0 vs lib/reel/websocket.rb in reel-0.6.0.pre1

- old
+ new

@@ -1,53 +1,44 @@ require 'forwardable' -require 'websocket_parser' +require 'websocket/driver' +require 'rack' module Reel class WebSocket extend Forwardable include ConnectionMixin include RequestMixin attr_reader :socket def_delegators :@socket, :addr, :peeraddr - def initialize(info, socket) + def initialize(info, connection) + driver_env = DriverEnvironment.new(info, connection.socket) + + @socket = connection.hijack_socket @request_info = info - @socket = socket - handshake = ::WebSocket::ClientHandshake.new(:get, url, headers) - - if handshake.valid? - response = handshake.accept_response - response.render(socket) - else - error = handshake.errors.first - - response = Response.new(400) - response.reason = handshake.errors.first - response.render(@socket) - - raise HandshakeError, "error during handshake: #{error}" + @driver = ::WebSocket::Driver.rack(driver_env) + @driver.on(:close) do + @socket.close end - @parser = ::WebSocket::Parser.new + @message_stream = MessageStream.new(@socket, @driver) + @driver.start + rescue EOFError + close + end - @parser.on_close do |status, reason| - # According to the spec the server must respond with another - # close message before closing the connection - @socket << ::WebSocket::Message.close.to_data - close + def on_message(&block) + @driver.on :message do |message| + block.(message.data) end - - @parser.on_ping do |payload| - @socket << ::WebSocket::Message.pong(payload).to_data - end end - [:next_message, :next_messages, :on_message, :on_error, :on_close, :on_ping, :on_pong].each do |meth| - define_method meth do |&proc| - @parser.send __method__, &proc + [:error, :close, :ping, :pong].each do |meth| + define_method "on_#{meth}" do |&proc| + @driver.send(:on, meth, &proc) end end def read_every(n, unit = :s) cancel_timer! # only one timer allowed per stream @@ -63,43 +54,85 @@ end alias read_interval read_every alias read_frequency read_every def read - @parser.append @socket.readpartial(Connection::BUFFER_SIZE) until msg = @parser.next_message - msg - rescue - cancel_timer! - raise + @message_stream.read end - def body - nil + def closed? + @socket.closed? end def write(msg) - @socket << ::WebSocket::Message.new(msg).to_data - msg + if msg.is_a? String + @driver.text(msg) + elsif msg.is_a? Array + @driver.binary(msg) + else + raise "Can only send byte array or string over driver." + end rescue IOError, Errno::ECONNRESET, Errno::EPIPE cancel_timer! raise SocketError, "error writing to socket" rescue cancel_timer! raise end alias_method :<<, :write - def closed? - @socket.closed? - end - def close - cancel_timer! - @socket.close unless closed? + @driver.close + @socket.close end def cancel_timer! @timer && @timer.cancel end + private + + class DriverEnvironment + extend Forwardable + + attr_reader :env, :url, :socket + + def_delegators :socket, :write + + def initialize(info, socket) + @url = info.url + + env_hash = Hash[info.headers.map { |key, value| ['HTTP_' + key.upcase.gsub('-','_'),value ] }] + + env_hash.merge!({ + :method => info.method, + :input => info.body.to_s, + 'REMOTE_ADDR' => info.remote_addr + }) + + @env = ::Rack::MockRequest.env_for(@url, env_hash) + + @socket = socket + end + end + + class MessageStream + def initialize(socket, driver) + @socket = socket + @driver = driver + @message_buffer = [] + + @driver.on :message do |message| + @message_buffer.push(message.data) + end + end + + def read + while @message_buffer.empty? + buffer = @socket.readpartial(Connection::BUFFER_SIZE) + @driver.parse(buffer) + end + @message_buffer.shift + end + end end end