require "concurrent" module PeerCommander # Executes a given set of commands with a specified parallelism class ParallelExecutor SLEEP_DURATION = 0.2 def initialize @futures = [] @command_results = [] end def execute(commands, parallelism) raise ArgumentError, "Parallelism must be at least 1" if parallelism < 1 @parallelism = parallelism commands.each do |command| wait_for_slot if all_slots_filled? @futures << Concurrent::Future.execute { command.execute } end wait_for_all command_results end private attr_reader :parallelism, :futures, :command_results, :future_command_map def wait_for_slot while all_slots_filled? sleep(SLEEP_DURATION) remove_completed_commands end end def wait_for_all sleep(SLEEP_DURATION) while futures.any?(&:incomplete?) @command_results.push(*extract_results_from(futures)) end def remove_completed_commands completed = futures.select(&:complete?) @futures -= completed @command_results.push(*extract_results_from(completed)) end def slot_available? futures.size < parallelism end def all_slots_filled? !slot_available? end def extract_results_from(futures_to_extract) futures_to_extract.map(&:value) end end end