lib/grumlin/transport.rb in grumlin-0.23.0 vs lib/grumlin/transport.rb in grumlin-1.0.0.rc1

- old
+ new

@@ -1,101 +1,99 @@ # frozen_string_literal: true -module Grumlin - class Transport - # A transport based on https://github.com/socketry/async - # and https://github.com/socketry/async-websocket +class Grumlin::Transport + # A transport based on https://github.com/socketry/async + # and https://github.com/socketry/async-websocket - include Console + include Console - attr_reader :url + attr_reader :url - # Transport is not reusable. Once closed should be recreated. - def initialize(url, parent: Async::Task.current, **client_options) - @url = url - @parent = parent - @client_options = client_options - @request_channel = Async::Channel.new - @response_channel = Async::Channel.new - end + # Transport is not reusable. Once closed should be recreated. + def initialize(url, parent: Async::Task.current, **client_options) + @url = url + @parent = parent + @client_options = client_options + @request_channel = Async::Channel.new + @response_channel = Async::Channel.new + end - def connected? - !@connection.nil? - end + def connected? + !@connection.nil? + end - def connect - raise ClientClosedError if @closed - raise AlreadyConnectedError if connected? + def connect + raise ClientClosedError if @closed + raise AlreadyConnectedError if connected? - @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options) - logger.debug(self) { "Connected to #{@url}." } + @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options) + logger.debug(self) { "Connected to #{@url}." } - @response_task = @parent.async { run_response_task } + @response_task = @parent.async { run_response_task } - @request_task = @parent.async { run_request_task } + @request_task = @parent.async { run_request_task } - @response_channel - end + @response_channel + end - def write(message) - raise NotConnectedError unless connected? + def write(message) + raise NotConnectedError unless connected? - @request_channel << message - end + @request_channel << message + end - def close - return if @closed + def close + return if @closed - @closed = true + @closed = true - @request_channel.close - @response_channel.close + @request_channel.close + @response_channel.close - begin - @connection.close - rescue StandardError - nil - end - @connection = nil - - @request_task&.stop(true) - @response_task&.stop(true) + begin + @connection.close + rescue StandardError + nil end + @connection = nil - def wait - @request_task.wait - @response_task.wait - end + @request_task&.stop(true) + @response_task&.stop(true) + end - private + def wait + @request_task.wait + @response_task.wait + end - def run_response_task - with_guard do - loop do - data = @connection.read - @response_channel << data - end + private + + def run_response_task + with_guard do + loop do + data = @connection.read + @response_channel << data end end + end - def run_request_task - with_guard do - @request_channel.each do |message| - @connection.write(message) - @connection.flush - end + def run_request_task + with_guard do + @request_channel.each do |message| + @connection.write(message) + @connection.flush end end + end - def with_guard - yield - rescue Async::Stop, Async::TimeoutError, StandardError => e - logger.debug(self) { "Guard error, closing." } - begin - @response_channel.exception(e) - rescue Async::Channel::ChannelClosedError - nil - end - close + def with_guard + yield + rescue Async::Stop, Async::TimeoutError, StandardError => e + logger.debug(self) { "Guard error, closing." } + begin + @response_channel.exception(e) + rescue Async::Channel::ChannelClosedError + nil end + close end end