lib/trailblazer/activity/dsl/linear/variable_mapping.rb in trailblazer-activity-dsl-linear-0.4.3 vs lib/trailblazer/activity/dsl/linear/variable_mapping.rb in trailblazer-activity-dsl-linear-0.5.0

- old
+ new

@@ -2,44 +2,175 @@ class Activity module DSL module Linear # Normalizer-steps to implement {:input} and {:output} # Returns an Extension instance to be thrown into the `step` DSL arguments. - def self.VariableMapping(input: VariableMapping.default_input, output: VariableMapping.default_output, output_with_outer_ctx: false) - input = - VariableMapping::Input::Scoped.new( - Trailblazer::Option(VariableMapping::filter_for(input)) - ) + def self.VariableMapping(input: nil, output: VariableMapping.default_output, output_with_outer_ctx: false, inject: []) + merge_instructions = VariableMapping.merge_instructions_from_dsl(input: input, output: output, output_with_outer_ctx: output_with_outer_ctx, inject: inject) - unscope_class = output_with_outer_ctx ? VariableMapping::Output::Unscoped::WithOuterContext : VariableMapping::Output::Unscoped - - output = - unscope_class.new( - Trailblazer::Option(VariableMapping::filter_for(output)) - ) - - TaskWrap::Extension( - merge: TaskWrap::VariableMapping.merge_for(input, output, id: input.object_id), # wraps filters: {Input(input), Output(output)} - ) + TaskWrap::Extension(merge: merge_instructions) end module VariableMapping module_function + # For the input filter we + # 1. create a separate {Pipeline} instance {pipe}. Depending on the user's options, this might have up to four steps. + # 2. The {pipe} is run in a lamdba {input}, the lambda returns the pipe's ctx[:input_ctx]. + # 3. The {input} filter in turn is wrapped into an {Activity::TaskWrap::Input} object via {#merge_instructions_for}. + # 4. The {TaskWrap::Input} instance is then finally placed into the taskWrap as {"task_wrap.input"}. + # # @private + def merge_instructions_from_dsl(input:, output:, output_with_outer_ctx:, inject:) + # FIXME: this could (should?) be in Normalizer? + inject_passthrough = inject.find_all { |name| name.is_a?(Symbol) } + inject_with_default = inject.find { |name| name.is_a?(Hash) } # FIXME: we only support one default hash in the DSL so far. + + input_steps = [ + ["input.init_hash", VariableMapping.method(:initial_input_hash)], + ] + + # With only injections defined, we do not filter out anything, we use the original ctx + # and _add_ defaulting for injected variables. + if !input # only injections defined + input_steps << ["input.default_input", VariableMapping.method(:default_input_ctx)] + end + + if input # :input or :input/:inject + input_steps << ["input.add_variables", VariableMapping.method(:add_variables)] + + input_filter = Trailblazer::Option(VariableMapping::filter_for(input)) + end + + if inject_passthrough || inject_with_default + input_steps << ["input.add_injections", VariableMapping.method(:add_injections)] # we now allow one filter per injected variable. + end + + if inject_passthrough || inject_with_default + injections = inject.collect do |name| + if name.is_a?(Symbol) + [[name, Trailblazer::Option(->(*) { [false, name] })]] # we don't want defaulting, this return value signalizes "please pass-through, only". + else # we automatically assume this is a hash of callables + name.collect do |_name, filter| + [_name, Trailblazer::Option(->(ctx, **kws) { [true, _name, filter.(ctx, **kws)] })] # filter will compute the default value + end + end + end.flatten(1).to_h + end + + input_steps << ["input.scope", VariableMapping.method(:scope)] + + + pipe = Activity::TaskWrap::Pipeline.new(input_steps) + + # gets wrapped by {VariableMapping::Input} and called there. + # API: @filter.([ctx, original_flow_options], **original_circuit_options) + # input = Trailblazer::Option(->(original_ctx, **) { }) + input = ->((ctx, flow_options), **circuit_options) do # This filter is called by {TaskWrap::Input#call} in the {activity} gem. + wrap_ctx, _ = pipe.({injections: injections, input_filter: input_filter}, [[ctx, flow_options], circuit_options]) + + wrap_ctx[:input_ctx] + end + + # 1. {} empty input hash + # 1. input # dynamic => hash + # 2. input_map => hash + # 3. inject => hash + # 4. Input::Scoped() + + unscope_class = output_with_outer_ctx ? VariableMapping::Output::Unscoped::WithOuterContext : VariableMapping::Output::Unscoped + + output = + unscope_class.new( + Trailblazer::Option(VariableMapping::filter_for(output)) + ) + + TaskWrap::VariableMapping.merge_instructions_for(input, output, id: input.object_id) # wraps filters: {Input(input), Output(output)} + end + +# DISCUSS: improvable sections such as merge vs hash[]= + def initial_input_hash(wrap_ctx, original_args) + wrap_ctx = wrap_ctx.merge(input_hash: {}) + + return wrap_ctx, original_args + end + + # Merge all original ctx variables into the new input_ctx. + # This happens when no {:input} is provided. + def default_input_ctx(wrap_ctx, original_args) + ((original_ctx, _), _) = original_args + + MergeVariables(original_ctx, wrap_ctx, original_args) + end + +# TODO: test {nil} default +# FIXME: what if you don't want inject but always the value from the config? + # Add injected variables if they're present on + # the original, incoming ctx. + def add_injections(wrap_ctx, original_args) + name2filter = wrap_ctx[:injections] + ((original_ctx, _), circuit_options) = original_args + + injections = + name2filter.collect do |name, filter| + # DISCUSS: should we remove {is_defaulted} and infer type from {filter} or the return value? + is_defaulted, new_name, default_value = filter.(original_ctx, keyword_arguments: original_ctx.to_hash, **circuit_options) # FIXME: interface? # {filter} exposes {Option} interface + + original_ctx.key?(name) ? + [new_name, original_ctx[name]] : ( + is_defaulted ? [new_name, default_value] : nil + ) + end.compact.to_h # FIXME: are we <2.6 safe here? + + MergeVariables(injections, wrap_ctx, original_args) + end + + # Implements {:input}. + def add_variables(wrap_ctx, original_args) + filter = wrap_ctx[:input_filter] + ((original_ctx, _), circuit_options) = original_args + + # this is the actual logic. + variables = filter.(original_ctx, keyword_arguments: original_ctx.to_hash, **circuit_options) + + MergeVariables(variables, wrap_ctx, original_args) + end + + # Finally, create a new input ctx from all the + # collected input variables. + # This goes into the step/nested OP. + def scope(wrap_ctx, original_args) + ((_, flow_options), _) = original_args + + # this is the actual context passed into the step. + wrap_ctx[:input_ctx] = Trailblazer::Context( + wrap_ctx[:input_hash], + {}, # mutable variables + flow_options[:context_options] + ) + + return wrap_ctx, original_args + end + + # Last call in every step. Currently replaces {:input_ctx} by adding variables using {#merge}. + # DISCUSS: improve here? + def MergeVariables(variables, wrap_ctx, original_args) + wrap_ctx[:input_hash] = wrap_ctx[:input_hash].merge(variables) + + return wrap_ctx, original_args + end + + # @private + # The default {:output} filter only returns the "mutable" part of the inner ctx. + # This means only variables added using {inner_ctx[..]=} are merged on the outside. def default_output ->(scoped, **) do _wrapped, mutable = scoped.decompose # `_wrapped` is what the `:input` filter returned, `mutable` is what the task wrote to `scoped`. mutable end end - # @private - def default_input - ->(ctx, **) { ctx } - end - # Returns a filter proc to be called in an Option. # @private def filter_for(filter) if filter.is_a?(::Array) || filter.is_a?(::Hash) DSL.filter_from_dsl(filter) @@ -72,46 +203,31 @@ return ary if ary.instance_of?(::Hash) Hash[ary.collect { |name| [name, name] }] end end - module Input - class Scoped - def initialize(filter) - @filter = filter - end - - def call((original_ctx, flow_options), **circuit_options) - Trailblazer::Context( - @filter.(original_ctx, keyword_arguments: original_ctx.to_hash, **circuit_options), - {}, - flow_options[:context_options] - ) - end - end - end - module Output # Merge the resulting {@filter.()} hash back into the original ctx. # DISCUSS: do we need the original_ctx as a filter argument? class Unscoped def initialize(filter) @filter = filter end + # The returned hash from {@filter} is merged with the original ctx. def call(new_ctx, (original_ctx, flow_options), **circuit_options) original_ctx.merge( call_filter(new_ctx, [original_ctx, flow_options], **circuit_options) ) end - def call_filter(new_ctx, (original_ctx, flow_options), **circuit_options) + def call_filter(new_ctx, (_original_ctx, _flow_options), **circuit_options) # Pass {inner_ctx, **inner_ctx} @filter.(new_ctx, keyword_arguments: new_ctx.to_hash, **circuit_options) end class WithOuterContext < Unscoped - def call_filter(new_ctx, (original_ctx, flow_options), **circuit_options) + def call_filter(new_ctx, (original_ctx, _flow_options), **circuit_options) # Pass {inner_ctx, outer_ctx, **inner_ctx} @filter.(new_ctx, original_ctx, keyword_arguments: new_ctx.to_hash, **circuit_options) end end end