Sha256: af049a597eb2b3d3bed8a00ed1053744b8c97e03dc93d997176248895ced5758

Contents?: true

Size: 1.2 KB

Versions: 2

Compression:

Stored size: 1.2 KB

Contents

module Flows
  class SharedContextPipeline
    EMPTY_ARRAY = [].freeze

    # @api private
    Step = Struct.new(:name, :lambda, :router_def, :next_step, keyword_init: true) do
      def to_node(pipeline_class)
        klass = self.class

        Flows::Flow::Node.new(
          body: lambda || pipeline_class.method(name),
          router: router_def.to_router(next_step),
          meta: { name: name },
          preprocessor: klass::NODE_PREPROCESSOR,
          postprocessor: klass::NODE_POSTPROCESSOR
        )
      end
    end

    Step.const_set(
      :NODE_PREPROCESSOR,
      lambda do |_input, context, node_meta|
        context[:class].before_each_callbacks.each do |callback|
          callback.call(context[:class], node_meta[:name], context[:data], context[:meta])
        end

        [EMPTY_ARRAY, context[:data]]
      end
    )

    Step.const_set(
      :NODE_POSTPROCESSOR,
      lambda do |result, context, node_meta|
        context[:data].merge!(result.instance_variable_get(:@data))

        context[:class].after_each_callbacks.each do |callback|
          callback.call(context[:class], node_meta[:name], result, context[:data], context[:meta])
        end

        result
      end
    )
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
flows-0.5.1 lib/flows/shared_context_pipeline/step.rb
flows-0.5.0 lib/flows/shared_context_pipeline/step.rb