Sha256: f40419965852193a3b4cdfb31a06bc166ca8ba0b6e2618f4f967cebe45f3c5ae
Contents?: true
Size: 1.83 KB
Versions: 1
Compression:
Stored size: 1.83 KB
Contents
# frozen_string_literal: true module Grumlin class Transport # A transport based on https://github.com/socketry/async # and https://github.com/socketry/async-websocket attr_reader :url def initialize(url, parent: Async::Task.current, **client_options) @url = url @parent = parent @client_options = client_options @request_channel = Async::Channel.new @response_channel = Async::Channel.new reset! end def connected? @connected end def connect # rubocop:disable Metrics/MethodLength, Metrics/AbcSize raise AlreadyConnectedError if connected? @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options) @response_task = @parent.async do loop do data = @connection.read @response_channel << data end rescue Async::Stop @response_channel.close rescue StandardError => e @response_channel.exception(e) end @request_task = @parent.async do @request_channel.each do |message| @connection.write(message) @connection.flush end rescue StandardError => e @response_channel.exception(e) end @connected = true @response_channel end def write(message) raise NotConnectedError unless connected? @request_channel << message end def close # rubocop:disable Metrics/MethodLength return unless connected? @request_channel.close @request_task.wait @response_task.stop @response_task.wait begin @connection.close rescue Errno::EPIPE nil end reset! end private def reset! @connected = false @connection = nil @response_task = nil @request_task = nil end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
grumlin-0.4.0 | lib/grumlin/transport.rb |