lib/grumlin/transport.rb in grumlin-0.3.0 vs lib/grumlin/transport.rb in grumlin-0.4.0
- old
+ new
@@ -2,67 +2,75 @@
module Grumlin
class Transport
# A transport based on https://github.com/socketry/async
# and https://github.com/socketry/async-websocket
- def initialize(url, parent: Async::Task.current)
- @endpoint = Async::HTTP::Endpoint.parse(url)
+
+ attr_reader :url
+
+ def initialize(url, parent: Async::Task.current, **client_options)
+ @url = url
@parent = parent
- @request_queue = Async::Queue.new
- @response_queue = Async::Queue.new
+ @client_options = client_options
+ @request_channel = Async::Channel.new
+ @response_channel = Async::Channel.new
reset!
end
- def url
- @endpoint.url
- end
-
def connected?
@connected
end
- def connect # rubocop:disable Metrics/MethodLength
+ def connect # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
raise AlreadyConnectedError if connected?
- @connection = Async::WebSocket::Client.connect(@endpoint)
+ @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options)
@response_task = @parent.async do
loop do
data = @connection.read
- @response_queue << data
+ @response_channel << data
end
rescue Async::Stop
- @response_queue << nil
+ @response_channel.close
+ rescue StandardError => e
+ @response_channel.exception(e)
end
@request_task = @parent.async do
- @request_queue.each do |message|
+ @request_channel.each do |message|
@connection.write(message)
@connection.flush
end
+ rescue StandardError => e
+ @response_channel.exception(e)
end
@connected = true
- @response_queue
+ @response_channel
end
def write(message)
raise NotConnectedError unless connected?
- @request_queue << message
+ @request_channel << message
end
- def close
- raise NotConnectedError unless connected?
+ def close # rubocop:disable Metrics/MethodLength
+ return unless connected?
- @request_queue << nil
+ @request_channel.close
@request_task.wait
@response_task.stop
@response_task.wait
- @connection.close
+ begin
+ @connection.close
+ rescue Errno::EPIPE
+ nil
+ end
reset!
end
private