lib/multi_process/group.rb in multi_process-1.1.1 vs lib/multi_process/group.rb in multi_process-1.2.0

- old
+ new

@@ -1,5 +1,7 @@ +# frozen_string_literal: true + module MultiProcess # # Store and run a group of processes. # class Group @@ -22,11 +24,11 @@ # @option otps [ Receiver ] :receiver Receiver to use for new added # processes. Defaults to `MultiProcess::Logger.global`. # def initialize(receiver: nil, partition: nil) @processes = [] - @receiver = receiver ? receiver : MultiProcess::Logger.global + @receiver = receiver || MultiProcess::Logger.global @partition = partition ? partition.to_i : 0 @mutex = Mutex.new end # Add new process or list of processes. @@ -85,35 +87,65 @@ else processes.each(&:wait) end end + # Wait until all process terminated. + # + # Raise an error if a process exists unsuccessfully. + # + # @param opts [ Hash ] Options. + # @option opts [ Integer ] :timeout Timeout in seconds to wait before raising {Timeout::Error}. + # + def wait!(timeout: nil) + if timeout + ::Timeout.timeout(timeout) { wait! } + else + processes.each(&:wait!) + end + end + # Start all process and wait for them to terminate. # # Given options will be passed to {#start} and {#wait}. # {#start} will only be called if partition is zero. # # If timeout is given process will be terminated using {#stop} # when timeout error is raised. # def run(delay: nil, timeout: nil) - if partition > 0 - partition.times.map do - Thread.new do - while (process = next_process) - process.run - end - end - end.each(&:join) + if partition.positive? + run_partition(&:run) else - start delay: delay - wait timeout: timeout + start(delay: delay) + wait(timeout: timeout) end ensure stop end + # Start all process and wait for them to terminate. + # + # Given options will be passed to {#start} and {#wait}. {#start} + # will only be called if partition is zero. + # + # If timeout is given process will be terminated using {#stop} when + # timeout error is raised. + # + # An error will be raised if any process exits unsuccessfully. + # + def run!(delay: nil, timeout: nil) + if partition.positive? + run_partition(&:run!) + else + start(delay: delay) + wait!(timeout: timeout) + end + ensure + stop + end + # Check if group is alive e.g. if at least on process is alive. # # @return [ Boolean ] True if group is alive. # def alive? @@ -148,8 +180,20 @@ @mutex.synchronize do @index ||= 0 @index += 1 processes[@index - 1] end + end + + def run_partition + Array.new(partition) do + Thread.new do + Thread.current.report_on_exception = false + + while (process = next_process) + yield process + end + end + end.each(&:join) end end end