Sha256: efd611fb578e332a679a2847533998032c82edf91d133dc0319a1b584d716594

Contents?: true

Size: 1.53 KB

Versions: 1

Compression:

Stored size: 1.53 KB

Contents

# frozen_string_literal: true

module Grumlin
  class Transport
    # A transport based on https://github.com/socketry/async
    # and https://github.com/socketry/async-websocket
    def initialize(url, parent: Async::Task.current)
      @endpoint = Async::HTTP::Endpoint.parse(url)
      @parent = parent
      @request_queue = Async::Queue.new
      @response_queue = Async::Queue.new
      reset!
    end

    def url
      @endpoint.url
    end

    def connected?
      @connected
    end

    def connect # rubocop:disable Metrics/MethodLength
      raise AlreadyConnectedError if connected?

      @connection = Async::WebSocket::Client.connect(@endpoint)

      @response_task = @parent.async do
        loop do
          data = @connection.read
          @response_queue << data
        end
      rescue Async::Stop
        @response_queue << nil
      end

      @request_task = @parent.async do
        @request_queue.each do |message|
          @connection.write(message)
          @connection.flush
        end
      end

      @connected = true

      @response_queue
    end

    def write(message)
      raise NotConnectedError unless connected?

      @request_queue << message
    end

    def close
      raise NotConnectedError unless connected?

      @request_queue << nil
      @request_task.wait

      @response_task.stop
      @response_task.wait

      @connection.close

      reset!
    end

    private

    def reset!
      @connected = false
      @connection = nil
      @response_task = nil
      @request_task = nil
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
grumlin-0.3.0 lib/grumlin/transport.rb