lib/grumlin/transport.rb in grumlin-0.6.2 vs lib/grumlin/transport.rb in grumlin-0.7.0
- old
+ new
@@ -3,84 +3,99 @@
module Grumlin
class Transport
# A transport based on https://github.com/socketry/async
# and https://github.com/socketry/async-websocket
+ include Console
+
attr_reader :url
+ # Transport is not reusable. Once closed should be recreated.
def initialize(url, parent: Async::Task.current, **client_options)
@url = url
@parent = parent
@client_options = client_options
@request_channel = Async::Channel.new
@response_channel = Async::Channel.new
- reset!
end
def connected?
- @connected
+ !@connection.nil?
end
- def connect # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
+ def connect
+ raise "ClientClosed" if @closed
raise AlreadyConnectedError if connected?
@connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options)
+ logger.debug(self) { "Connected to #{@url}." }
- @response_task = @parent.async do
- loop do
- data = @connection.read
- @response_channel << data
- end
- rescue Async::Stop
- @response_channel.close
- rescue StandardError => e
- @response_channel.exception(e)
- end
+ @response_task = @parent.async { run_response_task }
- @request_task = @parent.async do
- @request_channel.each do |message|
- @connection.write(message)
- @connection.flush
- end
- rescue StandardError => e
- @response_channel.exception(e)
- end
+ @request_task = @parent.async { run_request_task }
- @connected = true
-
@response_channel
end
def write(message)
raise NotConnectedError unless connected?
@request_channel << message
end
def close
- return unless connected?
+ return if @closed
+ @closed = true
+
@request_channel.close
- @request_task.wait
+ @response_channel.close
- @response_task.stop
- @response_task.wait
-
begin
@connection.close
- rescue Errno::EPIPE
+ rescue StandardError
nil
end
+ @connection = nil
- reset!
+ @request_task&.stop(true)
+ @response_task&.stop(true)
end
+ def wait
+ @request_task.wait
+ @response_task.wait
+ end
+
private
- def reset!
- @connected = false
- @connection = nil
- @response_task = nil
- @request_task = nil
+ def run_response_task
+ with_guard do
+ loop do
+ data = @connection.read
+ @response_channel << data
+ end
+ end
+ end
+
+ def run_request_task
+ with_guard do
+ @request_channel.each do |message|
+ @connection.write(message)
+ @connection.flush
+ end
+ end
+ end
+
+ def with_guard
+ yield
+ rescue Async::Stop, Async::TimeoutError, StandardError => e
+ logger.debug(self) { "Guard error, closing." }
+ begin
+ @response_channel.exception(e)
+ rescue Async::Channel::ChannelClosedError
+ nil
+ end
+ close
end
end
end