lib/grumlin/client.rb in grumlin-0.3.0 vs lib/grumlin/client.rb in grumlin-0.4.0

- old
+ new

@@ -1,48 +1,52 @@ # frozen_string_literal: true module Grumlin class Client - class PoolResource < self - attr :concurrency, :count + class PoolResource < Async::Pool::Resource + attr_reader :client def self.call - new(Grumlin.config.url, concurrency: Grumlin.config.client_concurrency).tap(&:connect) + config = Grumlin.config + new(config.url, client_factory: config.client_factory, concurrency: config.client_concurrency) end - def initialize(url, concurrency: 1, parent: Async::Task.current) - super(url, parent: parent) - @concurrency = concurrency - @count = 0 + def initialize(url, client_factory:, concurrency: 1, parent: Async::Task.current) + super(concurrency) + @client = client_factory.call(url, parent).tap(&:connect) end - def viable? - connected? + def closed? + !@client.connected? end - def closed? - connected? + def close + @client.close end - def reusable? - true + def write(*args) + @client.write(*args) end end - def initialize(url, parent: Async::Task.current) + def initialize(url, parent: Async::Task.current, **client_options) + @url = url + @client_options = client_options @parent = parent - @transport = Transport.new(url) reset! end def connect - response_queue = @transport.connect + @transport = build_transport + response_channel = @transport.connect @request_dispatcher = RequestDispatcher.new @parent.async do - response_queue.each do |response| + response_channel.each do |response| @request_dispatcher.add_response(response) end + rescue StandardError + close end end def close @transport.close @@ -50,35 +54,32 @@ reset! end def connected? - @transport.connected? + @transport&.connected? || false end # TODO: support yielding def write(*args) # rubocop:disable Metrics/MethodLength + raise NotConnectedError unless connected? + request_id = SecureRandom.uuid request = to_query(request_id, args) - queue = @request_dispatcher.add_request(request) + channel = @request_dispatcher.add_request(request) @transport.write(request) begin - msg, response = queue.dequeue - raise response if msg == :error - - return response.flat_map { |item| Typing.cast(item) } if msg == :result - - raise "ERROR" + channel.dequeue.flat_map { |item| Typing.cast(item) } rescue Async::Stop retry if @request_dispatcher.ongoing_request?(request_id) - raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list" + raise Grumlin::UnknownRequestStoppedError, "#{request_id} is not in the ongoing requests list" end end def inspect - "<#{self.class} url=#{@transport.url}>" + "<#{self.class} url=#{@url} connected=#{connected?}>" end alias to_s inspect private @@ -95,8 +96,13 @@ } end def reset! @request_dispatcher = nil + @transport = nil + end + + def build_transport + Transport.new(@url, parent: @parent, **@client_options) end end end