lib/grumlin/client.rb in grumlin-0.3.0 vs lib/grumlin/client.rb in grumlin-0.4.0
- old
+ new
@@ -1,48 +1,52 @@
# frozen_string_literal: true
module Grumlin
class Client
- class PoolResource < self
- attr :concurrency, :count
+ class PoolResource < Async::Pool::Resource
+ attr_reader :client
def self.call
- new(Grumlin.config.url, concurrency: Grumlin.config.client_concurrency).tap(&:connect)
+ config = Grumlin.config
+ new(config.url, client_factory: config.client_factory, concurrency: config.client_concurrency)
end
- def initialize(url, concurrency: 1, parent: Async::Task.current)
- super(url, parent: parent)
- @concurrency = concurrency
- @count = 0
+ def initialize(url, client_factory:, concurrency: 1, parent: Async::Task.current)
+ super(concurrency)
+ @client = client_factory.call(url, parent).tap(&:connect)
end
- def viable?
- connected?
+ def closed?
+ !@client.connected?
end
- def closed?
- connected?
+ def close
+ @client.close
end
- def reusable?
- true
+ def write(*args)
+ @client.write(*args)
end
end
- def initialize(url, parent: Async::Task.current)
+ def initialize(url, parent: Async::Task.current, **client_options)
+ @url = url
+ @client_options = client_options
@parent = parent
- @transport = Transport.new(url)
reset!
end
def connect
- response_queue = @transport.connect
+ @transport = build_transport
+ response_channel = @transport.connect
@request_dispatcher = RequestDispatcher.new
@parent.async do
- response_queue.each do |response|
+ response_channel.each do |response|
@request_dispatcher.add_response(response)
end
+ rescue StandardError
+ close
end
end
def close
@transport.close
@@ -50,35 +54,32 @@
reset!
end
def connected?
- @transport.connected?
+ @transport&.connected? || false
end
# TODO: support yielding
def write(*args) # rubocop:disable Metrics/MethodLength
+ raise NotConnectedError unless connected?
+
request_id = SecureRandom.uuid
request = to_query(request_id, args)
- queue = @request_dispatcher.add_request(request)
+ channel = @request_dispatcher.add_request(request)
@transport.write(request)
begin
- msg, response = queue.dequeue
- raise response if msg == :error
-
- return response.flat_map { |item| Typing.cast(item) } if msg == :result
-
- raise "ERROR"
+ channel.dequeue.flat_map { |item| Typing.cast(item) }
rescue Async::Stop
retry if @request_dispatcher.ongoing_request?(request_id)
- raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list"
+ raise Grumlin::UnknownRequestStoppedError, "#{request_id} is not in the ongoing requests list"
end
end
def inspect
- "<#{self.class} url=#{@transport.url}>"
+ "<#{self.class} url=#{@url} connected=#{connected?}>"
end
alias to_s inspect
private
@@ -95,8 +96,13 @@
}
end
def reset!
@request_dispatcher = nil
+ @transport = nil
+ end
+
+ def build_transport
+ Transport.new(@url, parent: @parent, **@client_options)
end
end
end