lib/grumlin/client.rb in grumlin-0.2.0 vs lib/grumlin/client.rb in grumlin-0.3.0

- old
+ new

@@ -1,121 +1,102 @@ # frozen_string_literal: true module Grumlin class Client - extend Forwardable + class PoolResource < self + attr :concurrency, :count - SUCCESS = { - 200 => :success, - 204 => :no_content, - 206 => :partial_content - }.freeze + def self.call + new(Grumlin.config.url, concurrency: Grumlin.config.client_concurrency).tap(&:connect) + end - ERRORS = { - 499 => InvalidRequestArgumentsError, - 500 => ServerError, - 597 => ScriptEvaluationError, - 599 => ServerSerializationError, - 598 => ServerTimeoutError, + def initialize(url, concurrency: 1, parent: Async::Task.current) + super(url, parent: parent) + @concurrency = concurrency + @count = 0 + end - 401 => ClientSideError, - 407 => ClientSideError, - 498 => ClientSideError - }.freeze + def viable? + connected? + end - def initialize(url, autoconnect: true) - @url = url - @transport = Transport::Async.new(url) - connect if autoconnect - end + def closed? + connected? + end - def_delegators :@transport, :connect, :disconnect, :requests - - # TODO: support yielding - def submit(*args) - request_id = SecureRandom.uuid - queue = @transport.submit(to_query(request_id, args)) - wait_for_response(request_id, queue) - ensure - @transport.close_request(request_id) + def reusable? + true + end end - def inspect - "<#{self.class} @url=#{@url}>" + def initialize(url, parent: Async::Task.current) + @parent = parent + @transport = Transport.new(url) + reset! end - alias to_s inspect - - private - - def wait_for_response(request_id, queue, result: []) # rubocop:disable Metrics/MethodLength - queue.each do |status, response| - check_errors!(request_id, status, response) - - case SUCCESS[response.dig(:status, :code)] - when :success - return result + Typing.cast(response.dig(:result, :data)) - when :partial_content then result += Typing.cast(response.dig(:result, :data)) - when :no_content - return [] + def connect + response_queue = @transport.connect + @request_dispatcher = RequestDispatcher.new + @parent.async do + response_queue.each do |response| + @request_dispatcher.add_response(response) end end - rescue ::Async::Stop - retry if @transport.ongoing_request?(request_id) - raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list" end - def to_query(request_id, message) - case message.first # TODO: properly handle unknown type of message - when String - string_query_message(request_id, *message) - when Grumlin::Step - bytecode_query_message(request_id, Translator.to_bytecode_query(message)) - end + def close + @transport.close + raise ResourceLeakError, "Request list is not empty: #{requests}" if @request_dispatcher.requests.any? + + reset! end - def check_errors!(_request_id, status, response) - reraise_error!(response) if status == :error + def connected? + @transport.connected? + end - status = response[:status] + # TODO: support yielding + def write(*args) # rubocop:disable Metrics/MethodLength + request_id = SecureRandom.uuid + request = to_query(request_id, args) + queue = @request_dispatcher.add_request(request) + @transport.write(request) - if (error = ERRORS[status[:code]]) - raise(error, status) - end + begin + msg, response = queue.dequeue + raise response if msg == :error - return unless SUCCESS[status[:code]].nil? + return response.flat_map { |item| Typing.cast(item) } if msg == :result - raise(UnknownResponseStatus, status) + raise "ERROR" + rescue Async::Stop + retry if @request_dispatcher.ongoing_request?(request_id) + raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list" + end end - def reraise_error!(error) - raise error - rescue StandardError - raise UnknownError + def inspect + "<#{self.class} url=#{@transport.url}>" end - def string_query_message(request_id, query, bindings) - { - requestId: request_id, - op: "eval", - processor: "", - args: { - gremlin: query, - bindings: bindings, - language: "gremlin-groovy" - } - } - end + alias to_s inspect - def bytecode_query_message(request_id, bytecode) + private + + def to_query(request_id, message) { requestId: request_id, op: "bytecode", processor: "traversal", args: { - gremlin: Typing.to_bytecode(bytecode), + gremlin: Typing.to_bytecode(Translator.to_bytecode_query(message)), aliases: { g: :g } } } + end + + def reset! + @request_dispatcher = nil end end end