Sha256: 1680ead74e69acf2af15085b10284e6ac2f7f01a3fcc12b47a7c614d06c3f464
Contents?: true
Size: 1.35 KB
Versions: 1
Compression:
Stored size: 1.35 KB
Contents
require "concurrent" module PeerCommander # Executes a given set of commands with a specified parallelism class ParallelExecutor SLEEP_DURATION = 0.2 def initialize @futures = [] @command_results = [] end def execute(commands, parallelism) raise ArgumentError, "Parallelism must be at least 1" if parallelism < 1 @parallelism = parallelism commands.each do |command| wait_for_slot if all_slots_filled? @futures << Concurrent::Future.execute { command.execute } end wait_for_all command_results end private attr_reader :parallelism, :futures, :command_results, :future_command_map def wait_for_slot while all_slots_filled? sleep(SLEEP_DURATION) remove_completed_commands end end def wait_for_all sleep(SLEEP_DURATION) while futures.any?(&:incomplete?) @command_results.push(*extract_results_from(futures)) end def remove_completed_commands completed = futures.select(&:complete?) @futures -= completed @command_results.push(*extract_results_from(completed)) end def slot_available? futures.size < parallelism end def all_slots_filled? !slot_available? end def extract_results_from(futures_to_extract) futures_to_extract.map(&:value) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
peer_commander-0.1.0 | lib/peer_commander/parallel_executor.rb |