lib/cucumber/pro/web_socket/session.rb in cucumber-pro-0.0.11 vs lib/cucumber/pro/web_socket/session.rb in cucumber-pro-0.0.12

- old
+ new

@@ -1,27 +1,28 @@ require 'json' require 'faye/websocket' require 'eventmachine' +require 'cucumber/pro/errors' module Cucumber module Pro module WebSocket class Session - def initialize(url, logger) + def initialize(url, logger, options) @url, @logger = url, logger create_socket = -> worker { ws = Faye::WebSocket::Client.new(@url, nil, ping: 15) ws.on :open, &worker.method(:on_open) ws.on :error, &worker.method(:on_error) ws.on :message, &worker.method(:on_message) ws.on :close, &worker.method(:on_close) ws } @queue = Queue.new - @socket = Worker.new(create_socket, logger, self) + @socket = Worker.new(create_socket, logger, self, options) end def send(message) logger.debug [:session, :send, message] socket.send(message.to_json) @@ -47,22 +48,24 @@ attr_reader :logger, :queue, :socket end class Worker - def initialize(create_socket, logger, error_handler) + def initialize(create_socket, logger, error_handler, options = {}) @create_socket, @logger, @error_handler = create_socket, logger, error_handler + @timeout = options.fetch(:timeout) { raise ArgumentError("Please specify timeout") } @q = Queue.new @em = Thread.new { start_client } @ack_count = 0 end def close @q << -> { if @ack_count == 0 - @ws.close + close_websocket else + ensure_close_timer_started EM.next_tick { close } end } self end @@ -79,10 +82,28 @@ !@em.alive? end private - attr_reader :logger, :error_handler, :next_task + attr_reader :logger, :error_handler, :next_task, :timeout + + def ensure_close_timer_started + return if @close_timer + logger.debug [:ws, :set_close_timeout, timeout] + @close_timer = EM.add_timer(timeout) { handle_close_timeout } + end + + def handle_close_timeout + logger.debug [:ws, :handle_close_timeout] + return unless @ws + error_handler.error Error::Timeout.new + close_websocket + end + + def close_websocket + logger.debug [:ws, :close_socket] + @ws.close + end def start_client EM.run do logger.debug [:ws, :start] @ws = @create_socket.call(self)