lib/attr/gather/workflow/callable.rb in attr-gather-1.1.3 vs lib/attr/gather/workflow/callable.rb in attr-gather-1.2.0

- old
+ new

@@ -1,10 +1,7 @@ # frozen_string_literal: true -require 'attr/gather/workflow/task_executor' -require 'attr/gather/workflow/async_task_executor' - module Attr module Gather module Workflow # @api private module Callable @@ -19,45 +16,42 @@ # enhancer = MyEnhancingWorkflow.new # enhancer.call(user_id: 1).value! # => {user_id: 1, email: 't@t.co} # # @param input [Hash] # - # @return [Concurrent::Promise] + # @return [Concurrent::Promise<Hash>] # # @note For more information, check out {https://dry-rb.org/gems/dry-monads/1.0/result} # # @api public def call(input) - final_results = [] + task_promises = {} - each_task_batch.reduce(input.dup) do |aggregated_input, batch| - executor_results = execute_batch(aggregated_input, batch) - final_results << executor_results - aggregator.call(aggregated_input, executor_results).value! + final_results = self.class.tasks.to_a.map do |task| + task_promises[task] = execute_task(input, task, task_promises) end - aggregator.call(input.dup, final_results.flatten(1)) + Concurrent::Promise.zip(*final_results).then do |results| + aggregator.call(input, results) + end end private - # Enumator for task batches - # - # @return [Enumerator] - # - # @api private - def each_task_batch - self.class.tasks.each_batch - end - # Executes a batch of tasks # # @return [Array<TaskExecutionResult>] # # @api private - def execute_batch(aggregated_input, batch) - executor = AsyncTaskExecutor.new(batch, container: container) - executor.call(aggregated_input) + def execute_task(initial_input, task, task_promises) + task_proc = container.resolve(task.name) + dep_promises = task.depends_on.map { |t| task_promises[t] } + input_promise = Concurrent::Promise.zip(*dep_promises) + + input_promise.then do |results| + dep_input = aggregator.call(initial_input, results) + task_proc.call(dep_input) + end end # @api private def container self.class.container