lib/multi_process/group.rb in multi_process-0.4.0 vs lib/multi_process/group.rb in multi_process-0.5.0
- old
+ new
@@ -1,11 +1,11 @@
module MultiProcess
-
+ #
# Store and run a group of processes.
#
class Group
-
+ #
# Return list of processes.
attr_reader :processes
# Receiver all processes in group should use.
#
@@ -20,77 +20,72 @@
#
# @param opts [ Hash ] Options
# @option otps [ Receiver ] :receiver Receiver to use for new added
# processes. Defaults to `MultiProcess::Logger.global`.
#
- def initialize(opts = {})
+ def initialize(receiver: nil, partition: nil)
@processes = []
- @receiver = opts[:receiver] ? opts[:receiver] : MultiProcess::Logger.global
- @partition = opts[:partition] ? opts[:partition].to_i : 0
+ @receiver = receiver ? receiver : MultiProcess::Logger.global
+ @partition = partition ? partition.to_i : 0
@mutex = Mutex.new
end
# Add new process or list of processes.
#
# If group was already started added processes will also be started.
#
# @param process [Process, Array<Process>] New process or processes.
#
- def <<(process)
- Array(process).flatten.each do |process|
+ def <<(processes)
+ Array(processes).flatten.each do |process|
processes << process
process.receiver = receiver
- if started?
- start process
- end
+ start process if started?
end
end
# Start all process in group.
#
# Call blocks until all processes are started.
#
- # @param opts [ Hash ] Options.
- # @option opts [ Integer ] :delay Delay in seconds between starting processes.
+ # @option delay [Integer] Delay in seconds between starting processes.
#
- def start(opts = {})
+ def start(delay: nil)
processes.each do |process|
- unless process.started?
- process.start
- sleep opts[:delay] if opts[:delay]
- end
+ next if process.started?
+
+ process.start
+ sleep delay if delay
end
end
# Check if group was already started.
#
# @return [ Boolean ] True if group was already started.
#
def started?
- processes.any? &:started?
+ processes.any?(&:started?)
end
# Stop all processes.
#
def stop
- processes.each do |process|
- process.stop
- end
+ processes.each(&:stop)
end
# Wait until all process terminated.
#
# @param opts [ Hash ] Options.
# @option opts [ Integer ] :timeout Timeout in seconds to wait before
# raising {Timeout::Error}.
#
- def wait(opts = {})
- opts[:timeout] ||= 30
-
- ::Timeout::timeout(opts[:timeout]) do
- processes.each{|p| p.wait}
+ def wait(timeout: nil)
+ if timeout
+ ::Timeout.timeout(timeout) { wait }
+ else
+ processes.each(&:wait)
end
end
# Start all process and wait for them to terminate.
#
@@ -98,42 +93,40 @@
# {#start} will only be called if partition is zero.
#
# If timeout is given process will be terminated using {#stop}
# when timeout error is raised.
#
- def run(opts = {})
+ def run(**kwargs)
if partition > 0
- threads = Array.new
- partition.times do
- threads << Thread.new do
+ partition.times.map do
+ Thread.new do
while (process = next_process)
process.run
end
end
- end
- threads.each &:join
+ end.each(&:join)
else
- start opts
- wait opts
+ start(**kwargs)
+ wait(**kwargs)
end
ensure
stop
end
# Check if group is alive e.g. if at least on process is alive.
#
# @return [ Boolean ] True if group is alive.
#
def alive?
- processes.any? &:alive?
+ processes.any?(&:alive?)
end
# Check if group is available. The group is available if all
# processes are available.
#
def available?
- !processes.any?{|p| !p.available? }
+ processes.all?(:available?)
end
# Wait until group is available. This implies waiting until
# all processes in group are available.
#
@@ -141,18 +134,17 @@
#
# @param opts [ Hash ] Options.
# @option opts [ Integer ] :timeout Timeout in seconds to wait for processes
# to become available. Defaults to {MultiProcess::DEFAULT_TIMEOUT}.
#
- def available!(opts = {})
- timeout = opts[:timeout] ? opts[:timeout].to_i : MultiProcess::DEFAULT_TIMEOUT
-
+ def available!(timeout: MultiProcess::DEFAULT_TIMEOUT)
Timeout.timeout timeout do
- processes.each{|p| p.available! }
+ processes.each(&:available!)
end
end
private
+
def next_process
@mutex.synchronize do
@index ||= 0
@index += 1
processes[@index - 1]