Sha256: 53fae8dddb6985402b3f61db4cabf5216005d8bedbfbe3f43349121b8f44695a

Contents?: true

Size: 1.5 KB

Versions: 2

Compression:

Stored size: 1.5 KB

Contents

# frozen_string_literal: true

require "concurrent-ruby"

module CobraCommander
  module Executor
    # A threaded execution environment, limited by the number of given workers
    class Execution < Hash
      #
      # @param jobs [Array<#call>] array of job objects
      # @param workers [Integer] number of workers to process this execution
      # @see CobraCommander::Executor::Job
      def initialize(jobs, workers:)
        super()

        @executor = Concurrent::FixedThreadPool.new(workers, auto_terminate: true)
        merge! create_futures(jobs)
      end

      # Wait for all jobs to complete, returns a future with all execution futures
      # @return [Concurrent::Promises::Future]
      def wait
        Concurrent::Promises.zip_futures_on(@executor, *values)
                            .tap(&:wait)
      end

      # The execution succeeds when all jobs succeeded
      # @return [Boolean]
      def success?
        values.all?(&:fulfilled?)
      end

    private

      def create_future(job)
        Concurrent::Promises.future_on(@executor, job, &:call).then do |result|
          status, output = result
          case status
          when :error then Concurrent::Promises.rejected_future(output)
          when :success, :skip then Concurrent::Promises.fulfilled_future(output)
          else
            Concurrent::Promises.fulfilled_future(result)
          end
        end.flat
      end

      def create_futures(jobs)
        jobs.to_h { |job| [job, create_future(job)] }
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
cobra_commander-1.0.1 lib/cobra_commander/executor/execution.rb
cobra_commander-1.0.0 lib/cobra_commander/executor/execution.rb