lib/grumlin/client.rb in grumlin-0.6.2 vs lib/grumlin/client.rb in grumlin-0.7.0
- old
+ new
@@ -11,10 +11,11 @@
end
def initialize(url, client_factory:, concurrency: 1, parent: Async::Task.current)
super(concurrency)
@client = client_factory.call(url, parent).tap(&:connect)
+ @parent = parent
end
def closed?
!@client.connected?
end
@@ -23,10 +24,12 @@
@client.close
end
def write(*args)
@client.write(*args)
+ ensure
+ @count += 1
end
def viable?
!closed?
end
@@ -34,37 +37,58 @@
def reusable?
!closed?
end
end
+ include Console
+
+ # Client is not reusable. Once closed should be recreated.
def initialize(url, parent: Async::Task.current, **client_options)
@url = url
@client_options = client_options
@parent = parent
- reset!
+ @request_dispatcher = nil
+ @transport = nil
end
def connect
+ raise "ClientClosed" if @closed
+
@transport = build_transport
response_channel = @transport.connect
@request_dispatcher = RequestDispatcher.new
- @parent.async do
+ @response_task = @parent.async do
response_channel.each do |response|
@request_dispatcher.add_response(response)
end
- rescue StandardError
- close
+ rescue Async::Stop, Async::TimeoutError, StandardError
+ close(check_requests: false)
end
+ logger.debug(self, "Connected")
end
- def close
+ # Before calling close the user must ensure that:
+ # 1) There are no ongoing requests
+ # 2) There will be no new writes after
+ def close(check_requests: true) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
+ return if @closed
+
+ @closed = true
+
@transport&.close
- if @request_dispatcher&.requests&.any?
- raise ResourceLeakError, "Request list is not empty: #{@request_dispatcher.requests}"
- end
+ @transport&.wait
- reset!
+ @response_task&.stop
+ @response_task&.wait
+
+ return if @request_dispatcher&.requests&.empty?
+
+ @request_dispatcher.clear unless check_requests
+
+ raise ResourceLeakError, "Request list is not empty: #{@request_dispatcher.requests}" if check_requests
+ ensure
+ logger.debug(self, "Closed")
end
def connected?
@transport&.connected? || false
end
@@ -78,13 +102,13 @@
channel = @request_dispatcher.add_request(request)
@transport.write(request)
begin
channel.dequeue.flat_map { |item| Typing.cast(item) }
- rescue Async::Stop
- retry if @request_dispatcher.ongoing_request?(request_id)
- raise Grumlin::UnknownRequestStoppedError, "#{request_id} is not in the ongoing requests list"
+ rescue Async::Stop, Async::TimeoutError
+ close(check_requests: false)
+ raise
end
end
def inspect
"<#{self.class} url=#{@url} connected=#{connected?}>"
@@ -102,14 +126,9 @@
args: {
gremlin: Typing.to_bytecode(Translator.to_bytecode_query(message)),
aliases: { g: :g }
}
}
- end
-
- def reset!
- @request_dispatcher = nil
- @transport = nil
end
def build_transport
Transport.new(@url, parent: @parent, **@client_options)
end