Sha256: 1680ead74e69acf2af15085b10284e6ac2f7f01a3fcc12b47a7c614d06c3f464

Contents?: true

Size: 1.35 KB

Versions: 1

Compression:

Stored size: 1.35 KB

Contents

require "concurrent"

module PeerCommander
  # Executes a given set of commands with a specified parallelism
  class ParallelExecutor
    SLEEP_DURATION = 0.2

    def initialize
      @futures = []
      @command_results = []
    end

    def execute(commands, parallelism)
      raise ArgumentError, "Parallelism must be at least 1" if parallelism < 1

      @parallelism = parallelism

      commands.each do |command|
        wait_for_slot if all_slots_filled?

        @futures << Concurrent::Future.execute { command.execute }
      end

      wait_for_all

      command_results
    end

    private

    attr_reader :parallelism, :futures, :command_results, :future_command_map

    def wait_for_slot
      while all_slots_filled?
        sleep(SLEEP_DURATION)
        remove_completed_commands
      end
    end

    def wait_for_all
      sleep(SLEEP_DURATION) while futures.any?(&:incomplete?)
      @command_results.push(*extract_results_from(futures))
    end

    def remove_completed_commands
      completed = futures.select(&:complete?)
      @futures -= completed
      @command_results.push(*extract_results_from(completed))
    end

    def slot_available?
      futures.size < parallelism
    end

    def all_slots_filled?
      !slot_available?
    end

    def extract_results_from(futures_to_extract)
      futures_to_extract.map(&:value)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
peer_commander-0.1.0 lib/peer_commander/parallel_executor.rb