lib/grumlin/transport.rb in grumlin-0.3.0 vs lib/grumlin/transport.rb in grumlin-0.4.0

- old
+ new

@@ -2,67 +2,75 @@ module Grumlin class Transport # A transport based on https://github.com/socketry/async # and https://github.com/socketry/async-websocket - def initialize(url, parent: Async::Task.current) - @endpoint = Async::HTTP::Endpoint.parse(url) + + attr_reader :url + + def initialize(url, parent: Async::Task.current, **client_options) + @url = url @parent = parent - @request_queue = Async::Queue.new - @response_queue = Async::Queue.new + @client_options = client_options + @request_channel = Async::Channel.new + @response_channel = Async::Channel.new reset! end - def url - @endpoint.url - end - def connected? @connected end - def connect # rubocop:disable Metrics/MethodLength + def connect # rubocop:disable Metrics/MethodLength, Metrics/AbcSize raise AlreadyConnectedError if connected? - @connection = Async::WebSocket::Client.connect(@endpoint) + @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options) @response_task = @parent.async do loop do data = @connection.read - @response_queue << data + @response_channel << data end rescue Async::Stop - @response_queue << nil + @response_channel.close + rescue StandardError => e + @response_channel.exception(e) end @request_task = @parent.async do - @request_queue.each do |message| + @request_channel.each do |message| @connection.write(message) @connection.flush end + rescue StandardError => e + @response_channel.exception(e) end @connected = true - @response_queue + @response_channel end def write(message) raise NotConnectedError unless connected? - @request_queue << message + @request_channel << message end - def close - raise NotConnectedError unless connected? + def close # rubocop:disable Metrics/MethodLength + return unless connected? - @request_queue << nil + @request_channel.close @request_task.wait @response_task.stop @response_task.wait - @connection.close + begin + @connection.close + rescue Errno::EPIPE + nil + end reset! end private