lib/grumlin/client.rb in grumlin-0.23.0 vs lib/grumlin/client.rb in grumlin-1.0.0.rc1
- old
+ new
@@ -1,125 +1,123 @@
# frozen_string_literal: true
-module Grumlin
- class Client
- class PoolResource < Async::Pool::Resource
- attr_reader :client
+class Grumlin::Client
+ class PoolResource < Async::Pool::Resource
+ attr_reader :client
- def self.call
- config = Grumlin.config
- new(config.url, client_factory: config.client_factory, concurrency: config.client_concurrency)
- end
+ def self.call
+ config = Grumlin.config
+ new(config.url, client_factory: config.client_factory, concurrency: config.client_concurrency)
+ end
- def initialize(url, client_factory:, concurrency: 1, parent: Async::Task.current)
- super(concurrency)
- @client = client_factory.call(url, parent).tap(&:connect)
- @parent = parent
- end
+ def initialize(url, client_factory:, concurrency: 1, parent: Async::Task.current)
+ super(concurrency)
+ @client = client_factory.call(url, parent).tap(&:connect)
+ @parent = parent
+ end
- def closed?
- !@client.connected?
- end
+ def closed?
+ !@client.connected?
+ end
- def close
- @client.close
- end
+ def close
+ @client.close
+ end
- def write(query)
- @client.write(query)
- ensure
- @count += 1
- end
+ def write(query)
+ @client.write(query)
+ ensure
+ @count += 1
+ end
- def viable?
- !closed?
- end
+ def viable?
+ !closed?
+ end
- def reusable?
- !closed?
- end
+ def reusable?
+ !closed?
end
+ end
- include Console
+ include Console
- # Client is not reusable. Once closed should be recreated.
- def initialize(url, parent: Async::Task.current, **client_options)
- @url = url
- @client_options = client_options
- @parent = parent
- @request_dispatcher = nil
- @transport = nil
- end
+ # Client is not reusable. Once closed should be recreated.
+ def initialize(url, parent: Async::Task.current, **client_options)
+ @url = url
+ @client_options = client_options
+ @parent = parent
+ @request_dispatcher = nil
+ @transport = nil
+ end
- def connect
- raise ClientClosedError if @closed
+ def connect
+ raise ClientClosedError if @closed
- @transport = build_transport
- response_channel = @transport.connect
- @request_dispatcher = RequestDispatcher.new
- @response_task = @parent.async do
- response_channel.each do |response|
- @request_dispatcher.add_response(response)
- end
- rescue Async::Stop, Async::TimeoutError, StandardError
- close(check_requests: false)
+ @transport = build_transport
+ response_channel = @transport.connect
+ @request_dispatcher = Grumlin::RequestDispatcher.new
+ @response_task = @parent.async do
+ response_channel.each do |response|
+ @request_dispatcher.add_response(response)
end
- logger.debug(self, "Connected")
+ rescue Async::Stop, Async::TimeoutError, StandardError
+ close(check_requests: false)
end
+ logger.debug(self, "Connected")
+ end
- # Before calling close the user must ensure that:
- # 1) There are no ongoing requests
- # 2) There will be no new writes after
- def close(check_requests: true) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
- return if @closed
+ # Before calling close the user must ensure that:
+ # 1) There are no ongoing requests
+ # 2) There will be no new writes after
+ def close(check_requests: true) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
+ return if @closed
- @closed = true
+ @closed = true
- @transport&.close
- @transport&.wait
+ @transport&.close
+ @transport&.wait
- @response_task&.stop
- @response_task&.wait
+ @response_task&.stop
+ @response_task&.wait
- return if @request_dispatcher&.requests&.empty?
+ return if @request_dispatcher&.requests&.empty?
- @request_dispatcher.clear unless check_requests
+ @request_dispatcher.clear unless check_requests
- raise ResourceLeakError, "Request list is not empty: #{@request_dispatcher.requests}" if check_requests
- ensure
- logger.debug(self, "Closed")
- end
+ raise ResourceLeakError, "Request list is not empty: #{@request_dispatcher.requests}" if check_requests
+ ensure
+ logger.debug(self, "Closed")
+ end
- def connected?
- @transport&.connected? || false
- end
+ def connected?
+ @transport&.connected? || false
+ end
- # TODO: support yielding
- def write(query)
- raise NotConnectedError unless connected?
+ # TODO: support yielding
+ def write(query)
+ raise NotConnectedError unless connected?
- channel = @request_dispatcher.add_request(query)
- begin
- @transport.write(query)
- channel.dequeue
- rescue Async::Stop, Async::TimeoutError
- close(check_requests: false)
- raise
- end
+ channel = @request_dispatcher.add_request(query)
+ begin
+ @transport.write(query)
+ channel.dequeue
+ rescue Async::Stop, Async::TimeoutError
+ close(check_requests: false)
+ raise
end
+ end
- def inspect
- "<#{self.class} url=#{@url} connected=#{connected?}>"
- end
+ def inspect
+ "<#{self.class} url=#{@url} connected=#{connected?}>"
+ end
- def to_s
- inspect
- end
+ def to_s
+ inspect
+ end
- private
+ private
- # This might be overridden in successors
- def build_transport
- Transport.new(@url, parent: @parent, **@client_options)
- end
+ # This might be overridden in successors
+ def build_transport
+ Grumlin::Transport.new(@url, parent: @parent, **@client_options)
end
end