lib/grumlin/request_dispatcher.rb in grumlin-0.18.1 vs lib/grumlin/request_dispatcher.rb in grumlin-0.19.0

- old
+ new

@@ -20,10 +20,15 @@ 401 => ClientSideError, 407 => ClientSideError, 498 => ClientSideError }.freeze + VERTEX_ALREADY_EXISTS = "Vertex with id already exists:" + EDGE_ALREADY_EXISTS = "Edge with id already exists:" + CONCURRENT_VERTEX_INSERT_FAILED = "Failed to complete Insert operation for a Vertex due to conflicting concurrent" + CONCURRENT_EDGE_INSERT_FAILED = "Failed to complete Insert operation for an Edge due to conflicting concurrent" + class DispatcherError < Grumlin::Error; end class RequestAlreadyAddedError < DispatcherError; end class UnknownRequestError < DispatcherError; end @@ -44,51 +49,71 @@ # TODO: sometimes response does not include requestID, no idea how to handle it so far. def add_response(response) # rubocop:disable Metrics/AbcSize request_id = response[:requestId] raise UnknownRequestError unless ongoing_request?(request_id) - request = @requests[request_id] + begin + request = @requests[request_id] - check_errors!(response[:status], request[:request]) + check_errors!(response[:status], request[:request]) - case SUCCESS[response.dig(:status, :code)] - when :success - request[:channel] << [*request[:result], response.dig(:result, :data)] + case SUCCESS[response.dig(:status, :code)] + when :success + request[:result] << response.dig(:result, :data) + request[:channel] << request[:result] + close_request(request_id) + when :partial_content then request[:result] << response.dig(:result, :data) + when :no_content + request[:channel] << [] + close_request(request_id) + end + rescue StandardError => e + request[:channel].exception(e) close_request(request_id) - when :partial_content then request[:result] << response.dig(:result, :data) - when :no_content - request[:channel] << [] - close_request(request_id) end - rescue StandardError => e - request[:channel].exception(e) - close_request(request_id) end - def close_request(request_id) - raise UnknownRequestError unless ongoing_request?(request_id) - - request = @requests.delete(request_id) - request[:channel].close - end - def ongoing_request?(request_id) @requests.key?(request_id) end def clear + @requests.each do |_id, request| + request[:channel].close! + end @requests.clear end private + def close_request(request_id) + raise UnknownRequestError unless ongoing_request?(request_id) + + request = @requests.delete(request_id) + request[:channel].close + end + def check_errors!(status, query) if (error = ERRORS[status[:code]]) - raise error.new(status, query) + raise ( + already_exists_error(status) || + concurrent_insert_error(status) || + error + ).new(status, query) end return unless SUCCESS[status[:code]].nil? raise(UnknownResponseStatus, status) + end + + def already_exists_error(status) + return VertexAlreadyExistsError if status[:message].include?(VERTEX_ALREADY_EXISTS) + return EdgeAlreadyExistsError if status[:message].include?(EDGE_ALREADY_EXISTS) + end + + def concurrent_insert_error(status) + return ConcurrentVertexInsertFailedError if status[:message].include?(CONCURRENT_VERTEX_INSERT_FAILED) + return ConcurrentEdgeInsertFailedError if status[:message].include?(CONCURRENT_EDGE_INSERT_FAILED) end end end