lib/grumlin/client.rb in grumlin-0.6.2 vs lib/grumlin/client.rb in grumlin-0.7.0

- old
+ new

@@ -11,10 +11,11 @@ end def initialize(url, client_factory:, concurrency: 1, parent: Async::Task.current) super(concurrency) @client = client_factory.call(url, parent).tap(&:connect) + @parent = parent end def closed? !@client.connected? end @@ -23,10 +24,12 @@ @client.close end def write(*args) @client.write(*args) + ensure + @count += 1 end def viable? !closed? end @@ -34,37 +37,58 @@ def reusable? !closed? end end + include Console + + # Client is not reusable. Once closed should be recreated. def initialize(url, parent: Async::Task.current, **client_options) @url = url @client_options = client_options @parent = parent - reset! + @request_dispatcher = nil + @transport = nil end def connect + raise "ClientClosed" if @closed + @transport = build_transport response_channel = @transport.connect @request_dispatcher = RequestDispatcher.new - @parent.async do + @response_task = @parent.async do response_channel.each do |response| @request_dispatcher.add_response(response) end - rescue StandardError - close + rescue Async::Stop, Async::TimeoutError, StandardError + close(check_requests: false) end + logger.debug(self, "Connected") end - def close + # Before calling close the user must ensure that: + # 1) There are no ongoing requests + # 2) There will be no new writes after + def close(check_requests: true) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity + return if @closed + + @closed = true + @transport&.close - if @request_dispatcher&.requests&.any? - raise ResourceLeakError, "Request list is not empty: #{@request_dispatcher.requests}" - end + @transport&.wait - reset! + @response_task&.stop + @response_task&.wait + + return if @request_dispatcher&.requests&.empty? + + @request_dispatcher.clear unless check_requests + + raise ResourceLeakError, "Request list is not empty: #{@request_dispatcher.requests}" if check_requests + ensure + logger.debug(self, "Closed") end def connected? @transport&.connected? || false end @@ -78,13 +102,13 @@ channel = @request_dispatcher.add_request(request) @transport.write(request) begin channel.dequeue.flat_map { |item| Typing.cast(item) } - rescue Async::Stop - retry if @request_dispatcher.ongoing_request?(request_id) - raise Grumlin::UnknownRequestStoppedError, "#{request_id} is not in the ongoing requests list" + rescue Async::Stop, Async::TimeoutError + close(check_requests: false) + raise end end def inspect "<#{self.class} url=#{@url} connected=#{connected?}>" @@ -102,14 +126,9 @@ args: { gremlin: Typing.to_bytecode(Translator.to_bytecode_query(message)), aliases: { g: :g } } } - end - - def reset! - @request_dispatcher = nil - @transport = nil end def build_transport Transport.new(@url, parent: @parent, **@client_options) end