lib/ztk/parallel.rb in ztk-0.2.4 vs lib/ztk/parallel.rb in ztk-0.2.5
- old
+ new
@@ -74,16 +74,17 @@
# @param [Hash] config Configuration options hash.
# @option config [Integer] :max_forks Maximum number of forks to use.
# @option config [Proc] :before_fork (nil) Proc to call before forking.
# @option config [Proc] :after_fork (nil) Proc to call after forking.
- def initialize(config={})
+ def initialize(configuration={})
super({
:max_forks => MAX_FORKS
- }.merge(config))
+ }.merge(configuration))
+ config.logger.debug { "config(#{config.inspect})" }
- raise ParallelError, "max_forks must be equal to or greater than one!" if @config.max_forks < 1
+ (config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!")
@forks = Array.new
@results = Array.new
GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true
end
@@ -93,38 +94,38 @@
# @yield Block should execute tasks to be performed in parallel.
# @yieldreturn [Object] Block can return any object to be marshalled back to
# the parent processes result set.
# @return [Integer] Returns the pid of the child process forked.
def process(&block)
- raise ParallelError, "You must supply a block to the process method!" if !block_given?
+ !block_given? and log_and_raise(ParallelError, "You must supply a block to the process method!")
- log(:debug) { "forks(#{@forks.inspect})" }
+ config.logger.debug { "forks(#{@forks.inspect})" }
- while (@forks.count >= @config.max_forks) do
+ while (@forks.count >= config.max_forks) do
wait
end
child_reader, parent_writer = IO.pipe
parent_reader, child_writer = IO.pipe
- @config.before_fork and @config.before_fork.call(Process.pid)
+ config.before_fork and config.before_fork.call(Process.pid)
pid = Process.fork do
- @config.after_fork and @config.after_fork.call(Process.pid)
+ config.after_fork and config.after_fork.call(Process.pid)
parent_writer.close
parent_reader.close
if !(data = block.call).nil?
- log(:debug) { "write(#{data.inspect})" }
+ config.logger.debug { "write(#{data.inspect})" }
child_writer.write(Base64.encode64(Marshal.dump(data)))
end
child_reader.close
child_writer.close
Process.exit!(0)
end
- @config.after_fork and @config.after_fork.call(Process.pid)
+ config.after_fork and config.after_fork.call(Process.pid)
child_reader.close
child_writer.close
fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid}
@@ -140,16 +141,16 @@
#
# @return [Array<pid, status, data>] An array containing the pid,
# status and data returned from the process block. If wait2() fails nil
# is returned.
def wait
- log(:debug) { "wait" }
- log(:debug) { "forks(#{@forks.inspect})" }
+ config.logger.debug { "wait" }
+ config.logger.debug { "forks(#{@forks.inspect})" }
pid, status = (Process.wait2(-1) rescue nil)
if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil?
data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil)
- log(:debug) { "read(#{data.inspect})" }
+ config.logger.debug { "read(#{data.inspect})" }
!data.nil? and @results.push(data)
fork[:reader].close
fork[:writer].close
@forks -= [fork]
@@ -160,21 +161,21 @@
# Waits for all forks to finish.
#
# @return [Array<Object>] The results from all of the *process* blocks.
def waitall
- log(:debug) { "waitall" }
+ config.logger.debug { "waitall" }
while @forks.count > 0
self.wait
end
@results
end
# Count of active forks.
#
# @return [Integer] Current number of active forks.
def count
- log(:debug) { "count(#{@forks.count})" }
+ config.logger.debug { "count(#{@forks.count})" }
@forks.count
end
end