lib/cucumber/pro/web_socket/session.rb in cucumber-pro-0.0.11 vs lib/cucumber/pro/web_socket/session.rb in cucumber-pro-0.0.12
- old
+ new
@@ -1,27 +1,28 @@
require 'json'
require 'faye/websocket'
require 'eventmachine'
+require 'cucumber/pro/errors'
module Cucumber
module Pro
module WebSocket
class Session
- def initialize(url, logger)
+ def initialize(url, logger, options)
@url, @logger = url, logger
create_socket = -> worker {
ws = Faye::WebSocket::Client.new(@url, nil, ping: 15)
ws.on :open, &worker.method(:on_open)
ws.on :error, &worker.method(:on_error)
ws.on :message, &worker.method(:on_message)
ws.on :close, &worker.method(:on_close)
ws
}
@queue = Queue.new
- @socket = Worker.new(create_socket, logger, self)
+ @socket = Worker.new(create_socket, logger, self, options)
end
def send(message)
logger.debug [:session, :send, message]
socket.send(message.to_json)
@@ -47,22 +48,24 @@
attr_reader :logger, :queue, :socket
end
class Worker
- def initialize(create_socket, logger, error_handler)
+ def initialize(create_socket, logger, error_handler, options = {})
@create_socket, @logger, @error_handler = create_socket, logger, error_handler
+ @timeout = options.fetch(:timeout) { raise ArgumentError("Please specify timeout") }
@q = Queue.new
@em = Thread.new { start_client }
@ack_count = 0
end
def close
@q << -> {
if @ack_count == 0
- @ws.close
+ close_websocket
else
+ ensure_close_timer_started
EM.next_tick { close }
end
}
self
end
@@ -79,10 +82,28 @@
!@em.alive?
end
private
- attr_reader :logger, :error_handler, :next_task
+ attr_reader :logger, :error_handler, :next_task, :timeout
+
+ def ensure_close_timer_started
+ return if @close_timer
+ logger.debug [:ws, :set_close_timeout, timeout]
+ @close_timer = EM.add_timer(timeout) { handle_close_timeout }
+ end
+
+ def handle_close_timeout
+ logger.debug [:ws, :handle_close_timeout]
+ return unless @ws
+ error_handler.error Error::Timeout.new
+ close_websocket
+ end
+
+ def close_websocket
+ logger.debug [:ws, :close_socket]
+ @ws.close
+ end
def start_client
EM.run do
logger.debug [:ws, :start]
@ws = @create_socket.call(self)