lib/multi_process/group.rb in multi_process-1.1.1 vs lib/multi_process/group.rb in multi_process-1.2.0
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
module MultiProcess
#
# Store and run a group of processes.
#
class Group
@@ -22,11 +24,11 @@
# @option otps [ Receiver ] :receiver Receiver to use for new added
# processes. Defaults to `MultiProcess::Logger.global`.
#
def initialize(receiver: nil, partition: nil)
@processes = []
- @receiver = receiver ? receiver : MultiProcess::Logger.global
+ @receiver = receiver || MultiProcess::Logger.global
@partition = partition ? partition.to_i : 0
@mutex = Mutex.new
end
# Add new process or list of processes.
@@ -85,35 +87,65 @@
else
processes.each(&:wait)
end
end
+ # Wait until all process terminated.
+ #
+ # Raise an error if a process exists unsuccessfully.
+ #
+ # @param opts [ Hash ] Options.
+ # @option opts [ Integer ] :timeout Timeout in seconds to wait before raising {Timeout::Error}.
+ #
+ def wait!(timeout: nil)
+ if timeout
+ ::Timeout.timeout(timeout) { wait! }
+ else
+ processes.each(&:wait!)
+ end
+ end
+
# Start all process and wait for them to terminate.
#
# Given options will be passed to {#start} and {#wait}.
# {#start} will only be called if partition is zero.
#
# If timeout is given process will be terminated using {#stop}
# when timeout error is raised.
#
def run(delay: nil, timeout: nil)
- if partition > 0
- partition.times.map do
- Thread.new do
- while (process = next_process)
- process.run
- end
- end
- end.each(&:join)
+ if partition.positive?
+ run_partition(&:run)
else
- start delay: delay
- wait timeout: timeout
+ start(delay: delay)
+ wait(timeout: timeout)
end
ensure
stop
end
+ # Start all process and wait for them to terminate.
+ #
+ # Given options will be passed to {#start} and {#wait}. {#start}
+ # will only be called if partition is zero.
+ #
+ # If timeout is given process will be terminated using {#stop} when
+ # timeout error is raised.
+ #
+ # An error will be raised if any process exits unsuccessfully.
+ #
+ def run!(delay: nil, timeout: nil)
+ if partition.positive?
+ run_partition(&:run!)
+ else
+ start(delay: delay)
+ wait!(timeout: timeout)
+ end
+ ensure
+ stop
+ end
+
# Check if group is alive e.g. if at least on process is alive.
#
# @return [ Boolean ] True if group is alive.
#
def alive?
@@ -148,8 +180,20 @@
@mutex.synchronize do
@index ||= 0
@index += 1
processes[@index - 1]
end
+ end
+
+ def run_partition
+ Array.new(partition) do
+ Thread.new do
+ Thread.current.report_on_exception = false
+
+ while (process = next_process)
+ yield process
+ end
+ end
+ end.each(&:join)
end
end
end