lib/grumlin/client.rb in grumlin-0.2.0 vs lib/grumlin/client.rb in grumlin-0.3.0
- old
+ new
@@ -1,121 +1,102 @@
# frozen_string_literal: true
module Grumlin
class Client
- extend Forwardable
+ class PoolResource < self
+ attr :concurrency, :count
- SUCCESS = {
- 200 => :success,
- 204 => :no_content,
- 206 => :partial_content
- }.freeze
+ def self.call
+ new(Grumlin.config.url, concurrency: Grumlin.config.client_concurrency).tap(&:connect)
+ end
- ERRORS = {
- 499 => InvalidRequestArgumentsError,
- 500 => ServerError,
- 597 => ScriptEvaluationError,
- 599 => ServerSerializationError,
- 598 => ServerTimeoutError,
+ def initialize(url, concurrency: 1, parent: Async::Task.current)
+ super(url, parent: parent)
+ @concurrency = concurrency
+ @count = 0
+ end
- 401 => ClientSideError,
- 407 => ClientSideError,
- 498 => ClientSideError
- }.freeze
+ def viable?
+ connected?
+ end
- def initialize(url, autoconnect: true)
- @url = url
- @transport = Transport::Async.new(url)
- connect if autoconnect
- end
+ def closed?
+ connected?
+ end
- def_delegators :@transport, :connect, :disconnect, :requests
-
- # TODO: support yielding
- def submit(*args)
- request_id = SecureRandom.uuid
- queue = @transport.submit(to_query(request_id, args))
- wait_for_response(request_id, queue)
- ensure
- @transport.close_request(request_id)
+ def reusable?
+ true
+ end
end
- def inspect
- "<#{self.class} @url=#{@url}>"
+ def initialize(url, parent: Async::Task.current)
+ @parent = parent
+ @transport = Transport.new(url)
+ reset!
end
- alias to_s inspect
-
- private
-
- def wait_for_response(request_id, queue, result: []) # rubocop:disable Metrics/MethodLength
- queue.each do |status, response|
- check_errors!(request_id, status, response)
-
- case SUCCESS[response.dig(:status, :code)]
- when :success
- return result + Typing.cast(response.dig(:result, :data))
- when :partial_content then result += Typing.cast(response.dig(:result, :data))
- when :no_content
- return []
+ def connect
+ response_queue = @transport.connect
+ @request_dispatcher = RequestDispatcher.new
+ @parent.async do
+ response_queue.each do |response|
+ @request_dispatcher.add_response(response)
end
end
- rescue ::Async::Stop
- retry if @transport.ongoing_request?(request_id)
- raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list"
end
- def to_query(request_id, message)
- case message.first # TODO: properly handle unknown type of message
- when String
- string_query_message(request_id, *message)
- when Grumlin::Step
- bytecode_query_message(request_id, Translator.to_bytecode_query(message))
- end
+ def close
+ @transport.close
+ raise ResourceLeakError, "Request list is not empty: #{requests}" if @request_dispatcher.requests.any?
+
+ reset!
end
- def check_errors!(_request_id, status, response)
- reraise_error!(response) if status == :error
+ def connected?
+ @transport.connected?
+ end
- status = response[:status]
+ # TODO: support yielding
+ def write(*args) # rubocop:disable Metrics/MethodLength
+ request_id = SecureRandom.uuid
+ request = to_query(request_id, args)
+ queue = @request_dispatcher.add_request(request)
+ @transport.write(request)
- if (error = ERRORS[status[:code]])
- raise(error, status)
- end
+ begin
+ msg, response = queue.dequeue
+ raise response if msg == :error
- return unless SUCCESS[status[:code]].nil?
+ return response.flat_map { |item| Typing.cast(item) } if msg == :result
- raise(UnknownResponseStatus, status)
+ raise "ERROR"
+ rescue Async::Stop
+ retry if @request_dispatcher.ongoing_request?(request_id)
+ raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list"
+ end
end
- def reraise_error!(error)
- raise error
- rescue StandardError
- raise UnknownError
+ def inspect
+ "<#{self.class} url=#{@transport.url}>"
end
- def string_query_message(request_id, query, bindings)
- {
- requestId: request_id,
- op: "eval",
- processor: "",
- args: {
- gremlin: query,
- bindings: bindings,
- language: "gremlin-groovy"
- }
- }
- end
+ alias to_s inspect
- def bytecode_query_message(request_id, bytecode)
+ private
+
+ def to_query(request_id, message)
{
requestId: request_id,
op: "bytecode",
processor: "traversal",
args: {
- gremlin: Typing.to_bytecode(bytecode),
+ gremlin: Typing.to_bytecode(Translator.to_bytecode_query(message)),
aliases: { g: :g }
}
}
+ end
+
+ def reset!
+ @request_dispatcher = nil
end
end
end