lib/graphql/stitching/executor.rb in graphql-stitching-0.3.6 vs lib/graphql/stitching/executor.rb in graphql-stitching-1.0.0

- old
+ new

@@ -3,236 +3,10 @@ require "json" module GraphQL module Stitching class Executor - - class RootSource < GraphQL::Dataloader::Source - def initialize(executor, location) - @executor = executor - @location = location - end - - def fetch(ops) - op = ops.first # There should only ever be one per location at a time - - query_document = build_document(op, @executor.request.operation_name) - query_variables = @executor.request.variables.slice(*op["variables"].keys) - result = @executor.supergraph.execute_at_location(op["location"], query_document, query_variables, @executor.request.context) - @executor.query_count += 1 - - @executor.data.merge!(result["data"]) if result["data"] - if result["errors"]&.any? - result["errors"].each { _1.delete("locations") } - @executor.errors.concat(result["errors"]) - end - - ops.map { op["order"] } - end - - # Builds root source documents - # "query MyOperation_1($var:VarType) { rootSelections ... }" - def build_document(op, operation_name = nil) - doc = String.new - doc << op["operation_type"] - - if operation_name - doc << " #{operation_name}_#{op["order"]}" - end - - if op["variables"].any? - variable_defs = op["variables"].map { |k, v| "$#{k}:#{v}" }.join(",") - doc << "(#{variable_defs})" - end - - doc << op["selections"] - doc - end - end - - class BoundarySource < GraphQL::Dataloader::Source - def initialize(executor, location) - @executor = executor - @location = location - end - - def fetch(ops) - origin_sets_by_operation = ops.each_with_object({}) do |op, memo| - origin_set = op["path"].reduce([@executor.data]) do |set, path_segment| - set.flat_map { |obj| obj && obj[path_segment] }.tap(&:compact!) - end - - if op["if_type"] - # operations planned around unused fragment conditions should not trigger requests - origin_set.select! { _1["_STITCH_typename"] == op["if_type"] } - end - - memo[op] = origin_set if origin_set.any? - end - - if origin_sets_by_operation.any? - query_document, variable_names = build_document(origin_sets_by_operation, @executor.request.operation_name) - variables = @executor.request.variables.slice(*variable_names) - raw_result = @executor.supergraph.execute_at_location(@location, query_document, variables, @executor.request.context) - @executor.query_count += 1 - - merge_results!(origin_sets_by_operation, raw_result.dig("data")) - - errors = raw_result.dig("errors") - @executor.errors.concat(extract_errors!(origin_sets_by_operation, errors)) if errors&.any? - end - - ops.map { origin_sets_by_operation[_1] ? _1["order"] : nil } - end - - # Builds batched boundary queries - # "query MyOperation_2_3($var:VarType) { - # _0_result: list(keys:["a","b","c"]) { boundarySelections... } - # _1_0_result: item(key:"x") { boundarySelections... } - # _1_1_result: item(key:"y") { boundarySelections... } - # _1_2_result: item(key:"z") { boundarySelections... } - # }" - def build_document(origin_sets_by_operation, operation_name = nil) - variable_defs = {} - query_fields = origin_sets_by_operation.map.with_index do |(op, origin_set), batch_index| - variable_defs.merge!(op["variables"]) - boundary = op["boundary"] - key_selection = "_STITCH_#{boundary["key"]}" - - if boundary["list"] - input = JSON.generate(origin_set.map { _1[key_selection] }) - "_#{batch_index}_result: #{boundary["field"]}(#{boundary["arg"]}:#{input}) #{op["selections"]}" - else - origin_set.map.with_index do |origin_obj, index| - input = JSON.generate(origin_obj[key_selection]) - "_#{batch_index}_#{index}_result: #{boundary["field"]}(#{boundary["arg"]}:#{input}) #{op["selections"]}" - end - end - end - - doc = String.new - doc << "query" # << boundary fulfillment always uses query - - if operation_name - doc << " #{operation_name}" - origin_sets_by_operation.each_key do |op| - doc << "_#{op["order"]}" - end - end - - if variable_defs.any? - variable_str = variable_defs.map { |k, v| "$#{k}:#{v}" }.join(",") - doc << "(#{variable_str})" - end - - doc << "{ #{query_fields.join(" ")} }" - - return doc, variable_defs.keys - end - - def merge_results!(origin_sets_by_operation, raw_result) - return unless raw_result - - origin_sets_by_operation.each_with_index do |(op, origin_set), batch_index| - results = if op.dig("boundary", "list") - raw_result["_#{batch_index}_result"] - else - origin_set.map.with_index { |_, index| raw_result["_#{batch_index}_#{index}_result"] } - end - - next unless results&.any? - - origin_set.each_with_index do |origin_obj, index| - origin_obj.merge!(results[index]) if results[index] - end - end - end - - # https://spec.graphql.org/June2018/#sec-Errors - def extract_errors!(origin_sets_by_operation, errors) - ops = origin_sets_by_operation.keys - origin_sets = origin_sets_by_operation.values - pathed_errors_by_op_index_and_object_id = {} - - errors_result = errors.each_with_object([]) do |err, memo| - err.delete("locations") - path = err["path"] - - if path && path.length > 0 - result_alias = /^_(\d+)(?:_(\d+))?_result$/.match(path.first.to_s) - - if result_alias - path = err["path"] = path[1..-1] - - origin_obj = if result_alias[2] - origin_sets.dig(result_alias[1].to_i, result_alias[2].to_i) - elsif path[0].is_a?(Integer) || /\d+/.match?(path[0].to_s) - origin_sets.dig(result_alias[1].to_i, path.shift.to_i) - end - - if origin_obj - by_op_index = pathed_errors_by_op_index_and_object_id[result_alias[1].to_i] ||= {} - by_object_id = by_op_index[origin_obj.object_id] ||= [] - by_object_id << err - next - end - end - end - - memo << err - end - - if pathed_errors_by_op_index_and_object_id.any? - pathed_errors_by_op_index_and_object_id.each do |op_index, pathed_errors_by_object_id| - repath_errors!(pathed_errors_by_object_id, ops.dig(op_index, "path")) - errors_result.concat(pathed_errors_by_object_id.values) - end - end - errors_result.flatten! - end - - private - - # traverse forward through origin data, expanding arrays to follow all paths - # any errors found for an origin object_id have their path prefixed by the object path - def repath_errors!(pathed_errors_by_object_id, forward_path, current_path=[], root=@executor.data) - current_path.push(forward_path.shift) - scope = root[current_path.last] - - if forward_path.any? && scope.is_a?(Array) - scope.each_with_index do |element, index| - inner_elements = element.is_a?(Array) ? element.flatten : [element] - inner_elements.each do |inner_element| - current_path << index - repath_errors!(pathed_errors_by_object_id, forward_path, current_path, inner_element) - current_path.pop - end - end - - elsif forward_path.any? - current_path << index - repath_errors!(pathed_errors_by_object_id, forward_path, current_path, scope) - current_path.pop - - elsif scope.is_a?(Array) - scope.each_with_index do |element, index| - inner_elements = element.is_a?(Array) ? element.flatten : [element] - inner_elements.each do |inner_element| - errors = pathed_errors_by_object_id[inner_element.object_id] - errors.each { _1["path"] = [*current_path, index, *_1["path"]] } if errors - end - end - - else - errors = pathed_errors_by_object_id[scope.object_id] - errors.each { _1["path"] = [*current_path, *_1["path"]] } if errors - end - - forward_path.unshift(current_path.pop) - end - end - attr_reader :supergraph, :request, :data, :errors attr_accessor :query_count def initialize(supergraph:, request:, plan:, nonblocking: false) @supergraph = supergraph @@ -295,5 +69,8 @@ exec!(next_ordinals) if next_ordinals.any? end end end end + +require_relative "./executor/boundary_source" +require_relative "./executor/root_source"