lib/grumlin/client.rb in grumlin-0.1.3 vs lib/grumlin/client.rb in grumlin-0.2.0

- old
+ new

@@ -1,13 +1,17 @@ # frozen_string_literal: true module Grumlin - class Client # rubocop:disable Metrics/ClassLength - SUCCESS_STATUS = 200 - NO_CONTENT_STATUS = 204 - PARTIAL_CONTENT_STATUS = 206 + class Client + extend Forwardable + SUCCESS = { + 200 => :success, + 204 => :no_content, + 206 => :partial_content + }.freeze + ERRORS = { 499 => InvalidRequestArgumentsError, 500 => ServerError, 597 => ScriptEvaluationError, 599 => ServerSerializationError, @@ -16,157 +20,102 @@ 401 => ClientSideError, 407 => ClientSideError, 498 => ClientSideError }.freeze - def initialize(url, task: Async::Task.current, autoconnect: true, mode: :bytecode) - @task = task - @endpoint = Async::HTTP::Endpoint.parse(url) - @mode = mode - - @requests = {} - @query_queue = Async::Queue.new - + def initialize(url, autoconnect: true) + @url = url + @transport = Transport::Async.new(url) connect if autoconnect end - def connect # rubocop:disable Metrics/MethodLength - raise AlreadyConnectedError unless @connection_task.nil? + def_delegators :@transport, :connect, :disconnect, :requests - @connection_task = @task.async do |subtask| - Async::WebSocket::Client.connect(@endpoint) do |connection| - subtask.async { query_task(connection) } - response_task(connection) - end - rescue StandardError => e - @requests.each_value do |queue| - queue << [:error, e] - end - disconnect - end + # TODO: support yielding + def submit(*args) + request_id = SecureRandom.uuid + queue = @transport.submit(to_query(request_id, args)) + wait_for_response(request_id, queue) + ensure + @transport.close_request(request_id) end - def disconnect - raise NotConnectedError if @connection_task.nil? - - @connection_task&.stop - @connection_task&.wait - @connection_task = nil - @requests = {} + def inspect + "<#{self.class} @url=#{@url}>" end - def query(*args) # rubocop:disable Metrics/MethodLength - response_queue, request_id = schedule_query(args) - result = [] + alias to_s inspect - response_queue.each do |status, response| - reraise_error!(response) if status == :error + private - status = response[:status] + def wait_for_response(request_id, queue, result: []) # rubocop:disable Metrics/MethodLength + queue.each do |status, response| + check_errors!(request_id, status, response) - if status[:code] == NO_CONTENT_STATUS - close_request(request_id) + case SUCCESS[response.dig(:status, :code)] + when :success + return result + Typing.cast(response.dig(:result, :data)) + when :partial_content then result += Typing.cast(response.dig(:result, :data)) + when :no_content return [] end - - check_errors!(status, request_id) - - page = Typing.cast(response.dig(:result, :data)) - - case status[:code] - when SUCCESS_STATUS - close_request(request_id) - return result + page - when PARTIAL_CONTENT_STATUS - result += page - else - raise UnknownResponseStatus, status - end end + rescue ::Async::Stop + retry if @transport.ongoing_request?(request_id) + raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list" end - private - - def schedule_query(args) - uuid = SecureRandom.uuid - queue = Async::Queue.new - @requests[uuid] = queue - @query_queue << to_query(uuid, args) - - [queue, uuid] - end - - def to_query(uuid, message) - case message.first + def to_query(request_id, message) + case message.first # TODO: properly handle unknown type of message when String - string_query_message(uuid, *message) + string_query_message(request_id, *message) when Grumlin::Step - build_query(uuid, message) + bytecode_query_message(request_id, Translator.to_bytecode_query(message)) end end - def check_errors!(status, request_id) - error = ERRORS[status[:code]] - close_request(request_id) - raise(error, status) if error - end + def check_errors!(_request_id, status, response) + reraise_error!(response) if status == :error - def close_request(request_id) - @requests.delete(request_id) + status = response[:status] + + if (error = ERRORS[status[:code]]) + raise(error, status) + end + + return unless SUCCESS[status[:code]].nil? + + raise(UnknownResponseStatus, status) end def reraise_error!(error) raise error rescue StandardError - raise ConnectionError + raise UnknownError end - def query_task(connection) - loop do - connection.write @query_queue.dequeue - connection.flush - end - end - - def response_task(connection) - loop do - response = connection.read - response_queue = @requests[response[:requestId]] - response_queue << [:response, response] - end - end - - def string_query_message(uuid, query, bindings) + def string_query_message(request_id, query, bindings) { - requestId: uuid, + requestId: request_id, op: "eval", processor: "", args: { gremlin: query, bindings: bindings, language: "gremlin-groovy" } } end - def bytecode_query_message(uuid, bytecode) + def bytecode_query_message(request_id, bytecode) { - requestId: uuid, + requestId: request_id, op: "bytecode", processor: "traversal", args: { - gremlin: { "@type": "g:Bytecode", "@value": { step: bytecode } }, + gremlin: Typing.to_bytecode(bytecode), aliases: { g: :g } } } - end - - def build_query(uuid, steps) - case @mode - when :string - string_query_message(uuid, *Translator.to_string_query(steps)) - else - bytecode_query_message(uuid, Translator.to_bytecode_query(steps)) - end end end end