lib/attr/gather/workflow/callable.rb in attr-gather-1.1.3 vs lib/attr/gather/workflow/callable.rb in attr-gather-1.2.0
- old
+ new
@@ -1,10 +1,7 @@
# frozen_string_literal: true
-require 'attr/gather/workflow/task_executor'
-require 'attr/gather/workflow/async_task_executor'
-
module Attr
module Gather
module Workflow
# @api private
module Callable
@@ -19,45 +16,42 @@
# enhancer = MyEnhancingWorkflow.new
# enhancer.call(user_id: 1).value! # => {user_id: 1, email: 't@t.co}
#
# @param input [Hash]
#
- # @return [Concurrent::Promise]
+ # @return [Concurrent::Promise<Hash>]
#
# @note For more information, check out {https://dry-rb.org/gems/dry-monads/1.0/result}
#
# @api public
def call(input)
- final_results = []
+ task_promises = {}
- each_task_batch.reduce(input.dup) do |aggregated_input, batch|
- executor_results = execute_batch(aggregated_input, batch)
- final_results << executor_results
- aggregator.call(aggregated_input, executor_results).value!
+ final_results = self.class.tasks.to_a.map do |task|
+ task_promises[task] = execute_task(input, task, task_promises)
end
- aggregator.call(input.dup, final_results.flatten(1))
+ Concurrent::Promise.zip(*final_results).then do |results|
+ aggregator.call(input, results)
+ end
end
private
- # Enumator for task batches
- #
- # @return [Enumerator]
- #
- # @api private
- def each_task_batch
- self.class.tasks.each_batch
- end
-
# Executes a batch of tasks
#
# @return [Array<TaskExecutionResult>]
#
# @api private
- def execute_batch(aggregated_input, batch)
- executor = AsyncTaskExecutor.new(batch, container: container)
- executor.call(aggregated_input)
+ def execute_task(initial_input, task, task_promises)
+ task_proc = container.resolve(task.name)
+ dep_promises = task.depends_on.map { |t| task_promises[t] }
+ input_promise = Concurrent::Promise.zip(*dep_promises)
+
+ input_promise.then do |results|
+ dep_input = aggregator.call(initial_input, results)
+ task_proc.call(dep_input)
+ end
end
# @api private
def container
self.class.container