lib/ztk/parallel.rb in ztk-0.2.4 vs lib/ztk/parallel.rb in ztk-0.2.5

- old
+ new

@@ -74,16 +74,17 @@ # @param [Hash] config Configuration options hash. # @option config [Integer] :max_forks Maximum number of forks to use. # @option config [Proc] :before_fork (nil) Proc to call before forking. # @option config [Proc] :after_fork (nil) Proc to call after forking. - def initialize(config={}) + def initialize(configuration={}) super({ :max_forks => MAX_FORKS - }.merge(config)) + }.merge(configuration)) + config.logger.debug { "config(#{config.inspect})" } - raise ParallelError, "max_forks must be equal to or greater than one!" if @config.max_forks < 1 + (config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!") @forks = Array.new @results = Array.new GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true end @@ -93,38 +94,38 @@ # @yield Block should execute tasks to be performed in parallel. # @yieldreturn [Object] Block can return any object to be marshalled back to # the parent processes result set. # @return [Integer] Returns the pid of the child process forked. def process(&block) - raise ParallelError, "You must supply a block to the process method!" if !block_given? + !block_given? and log_and_raise(ParallelError, "You must supply a block to the process method!") - log(:debug) { "forks(#{@forks.inspect})" } + config.logger.debug { "forks(#{@forks.inspect})" } - while (@forks.count >= @config.max_forks) do + while (@forks.count >= config.max_forks) do wait end child_reader, parent_writer = IO.pipe parent_reader, child_writer = IO.pipe - @config.before_fork and @config.before_fork.call(Process.pid) + config.before_fork and config.before_fork.call(Process.pid) pid = Process.fork do - @config.after_fork and @config.after_fork.call(Process.pid) + config.after_fork and config.after_fork.call(Process.pid) parent_writer.close parent_reader.close if !(data = block.call).nil? - log(:debug) { "write(#{data.inspect})" } + config.logger.debug { "write(#{data.inspect})" } child_writer.write(Base64.encode64(Marshal.dump(data))) end child_reader.close child_writer.close Process.exit!(0) end - @config.after_fork and @config.after_fork.call(Process.pid) + config.after_fork and config.after_fork.call(Process.pid) child_reader.close child_writer.close fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid} @@ -140,16 +141,16 @@ # # @return [Array<pid, status, data>] An array containing the pid, # status and data returned from the process block. If wait2() fails nil # is returned. def wait - log(:debug) { "wait" } - log(:debug) { "forks(#{@forks.inspect})" } + config.logger.debug { "wait" } + config.logger.debug { "forks(#{@forks.inspect})" } pid, status = (Process.wait2(-1) rescue nil) if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil? data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil) - log(:debug) { "read(#{data.inspect})" } + config.logger.debug { "read(#{data.inspect})" } !data.nil? and @results.push(data) fork[:reader].close fork[:writer].close @forks -= [fork] @@ -160,21 +161,21 @@ # Waits for all forks to finish. # # @return [Array<Object>] The results from all of the *process* blocks. def waitall - log(:debug) { "waitall" } + config.logger.debug { "waitall" } while @forks.count > 0 self.wait end @results end # Count of active forks. # # @return [Integer] Current number of active forks. def count - log(:debug) { "count(#{@forks.count})" } + config.logger.debug { "count(#{@forks.count})" } @forks.count end end