lib/grumlin/client.rb in grumlin-0.1.3 vs lib/grumlin/client.rb in grumlin-0.2.0
- old
+ new
@@ -1,13 +1,17 @@
# frozen_string_literal: true
module Grumlin
- class Client # rubocop:disable Metrics/ClassLength
- SUCCESS_STATUS = 200
- NO_CONTENT_STATUS = 204
- PARTIAL_CONTENT_STATUS = 206
+ class Client
+ extend Forwardable
+ SUCCESS = {
+ 200 => :success,
+ 204 => :no_content,
+ 206 => :partial_content
+ }.freeze
+
ERRORS = {
499 => InvalidRequestArgumentsError,
500 => ServerError,
597 => ScriptEvaluationError,
599 => ServerSerializationError,
@@ -16,157 +20,102 @@
401 => ClientSideError,
407 => ClientSideError,
498 => ClientSideError
}.freeze
- def initialize(url, task: Async::Task.current, autoconnect: true, mode: :bytecode)
- @task = task
- @endpoint = Async::HTTP::Endpoint.parse(url)
- @mode = mode
-
- @requests = {}
- @query_queue = Async::Queue.new
-
+ def initialize(url, autoconnect: true)
+ @url = url
+ @transport = Transport::Async.new(url)
connect if autoconnect
end
- def connect # rubocop:disable Metrics/MethodLength
- raise AlreadyConnectedError unless @connection_task.nil?
+ def_delegators :@transport, :connect, :disconnect, :requests
- @connection_task = @task.async do |subtask|
- Async::WebSocket::Client.connect(@endpoint) do |connection|
- subtask.async { query_task(connection) }
- response_task(connection)
- end
- rescue StandardError => e
- @requests.each_value do |queue|
- queue << [:error, e]
- end
- disconnect
- end
+ # TODO: support yielding
+ def submit(*args)
+ request_id = SecureRandom.uuid
+ queue = @transport.submit(to_query(request_id, args))
+ wait_for_response(request_id, queue)
+ ensure
+ @transport.close_request(request_id)
end
- def disconnect
- raise NotConnectedError if @connection_task.nil?
-
- @connection_task&.stop
- @connection_task&.wait
- @connection_task = nil
- @requests = {}
+ def inspect
+ "<#{self.class} @url=#{@url}>"
end
- def query(*args) # rubocop:disable Metrics/MethodLength
- response_queue, request_id = schedule_query(args)
- result = []
+ alias to_s inspect
- response_queue.each do |status, response|
- reraise_error!(response) if status == :error
+ private
- status = response[:status]
+ def wait_for_response(request_id, queue, result: []) # rubocop:disable Metrics/MethodLength
+ queue.each do |status, response|
+ check_errors!(request_id, status, response)
- if status[:code] == NO_CONTENT_STATUS
- close_request(request_id)
+ case SUCCESS[response.dig(:status, :code)]
+ when :success
+ return result + Typing.cast(response.dig(:result, :data))
+ when :partial_content then result += Typing.cast(response.dig(:result, :data))
+ when :no_content
return []
end
-
- check_errors!(status, request_id)
-
- page = Typing.cast(response.dig(:result, :data))
-
- case status[:code]
- when SUCCESS_STATUS
- close_request(request_id)
- return result + page
- when PARTIAL_CONTENT_STATUS
- result += page
- else
- raise UnknownResponseStatus, status
- end
end
+ rescue ::Async::Stop
+ retry if @transport.ongoing_request?(request_id)
+ raise UnknownRequestStopped, "#{request_id} is not in the ongoing requests list"
end
- private
-
- def schedule_query(args)
- uuid = SecureRandom.uuid
- queue = Async::Queue.new
- @requests[uuid] = queue
- @query_queue << to_query(uuid, args)
-
- [queue, uuid]
- end
-
- def to_query(uuid, message)
- case message.first
+ def to_query(request_id, message)
+ case message.first # TODO: properly handle unknown type of message
when String
- string_query_message(uuid, *message)
+ string_query_message(request_id, *message)
when Grumlin::Step
- build_query(uuid, message)
+ bytecode_query_message(request_id, Translator.to_bytecode_query(message))
end
end
- def check_errors!(status, request_id)
- error = ERRORS[status[:code]]
- close_request(request_id)
- raise(error, status) if error
- end
+ def check_errors!(_request_id, status, response)
+ reraise_error!(response) if status == :error
- def close_request(request_id)
- @requests.delete(request_id)
+ status = response[:status]
+
+ if (error = ERRORS[status[:code]])
+ raise(error, status)
+ end
+
+ return unless SUCCESS[status[:code]].nil?
+
+ raise(UnknownResponseStatus, status)
end
def reraise_error!(error)
raise error
rescue StandardError
- raise ConnectionError
+ raise UnknownError
end
- def query_task(connection)
- loop do
- connection.write @query_queue.dequeue
- connection.flush
- end
- end
-
- def response_task(connection)
- loop do
- response = connection.read
- response_queue = @requests[response[:requestId]]
- response_queue << [:response, response]
- end
- end
-
- def string_query_message(uuid, query, bindings)
+ def string_query_message(request_id, query, bindings)
{
- requestId: uuid,
+ requestId: request_id,
op: "eval",
processor: "",
args: {
gremlin: query,
bindings: bindings,
language: "gremlin-groovy"
}
}
end
- def bytecode_query_message(uuid, bytecode)
+ def bytecode_query_message(request_id, bytecode)
{
- requestId: uuid,
+ requestId: request_id,
op: "bytecode",
processor: "traversal",
args: {
- gremlin: { "@type": "g:Bytecode", "@value": { step: bytecode } },
+ gremlin: Typing.to_bytecode(bytecode),
aliases: { g: :g }
}
}
- end
-
- def build_query(uuid, steps)
- case @mode
- when :string
- string_query_message(uuid, *Translator.to_string_query(steps))
- else
- bytecode_query_message(uuid, Translator.to_bytecode_query(steps))
- end
end
end
end