lib/grumlin/transport.rb in grumlin-0.6.2 vs lib/grumlin/transport.rb in grumlin-0.7.0

- old
+ new

@@ -3,84 +3,99 @@ module Grumlin class Transport # A transport based on https://github.com/socketry/async # and https://github.com/socketry/async-websocket + include Console + attr_reader :url + # Transport is not reusable. Once closed should be recreated. def initialize(url, parent: Async::Task.current, **client_options) @url = url @parent = parent @client_options = client_options @request_channel = Async::Channel.new @response_channel = Async::Channel.new - reset! end def connected? - @connected + !@connection.nil? end - def connect # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + def connect + raise "ClientClosed" if @closed raise AlreadyConnectedError if connected? @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options) + logger.debug(self) { "Connected to #{@url}." } - @response_task = @parent.async do - loop do - data = @connection.read - @response_channel << data - end - rescue Async::Stop - @response_channel.close - rescue StandardError => e - @response_channel.exception(e) - end + @response_task = @parent.async { run_response_task } - @request_task = @parent.async do - @request_channel.each do |message| - @connection.write(message) - @connection.flush - end - rescue StandardError => e - @response_channel.exception(e) - end + @request_task = @parent.async { run_request_task } - @connected = true - @response_channel end def write(message) raise NotConnectedError unless connected? @request_channel << message end def close - return unless connected? + return if @closed + @closed = true + @request_channel.close - @request_task.wait + @response_channel.close - @response_task.stop - @response_task.wait - begin @connection.close - rescue Errno::EPIPE + rescue StandardError nil end + @connection = nil - reset! + @request_task&.stop(true) + @response_task&.stop(true) end + def wait + @request_task.wait + @response_task.wait + end + private - def reset! - @connected = false - @connection = nil - @response_task = nil - @request_task = nil + def run_response_task + with_guard do + loop do + data = @connection.read + @response_channel << data + end + end + end + + def run_request_task + with_guard do + @request_channel.each do |message| + @connection.write(message) + @connection.flush + end + end + end + + def with_guard + yield + rescue Async::Stop, Async::TimeoutError, StandardError => e + logger.debug(self) { "Guard error, closing." } + begin + @response_channel.exception(e) + rescue Async::Channel::ChannelClosedError + nil + end + close end end end