Sha256: 003f33e4a6e74b3b2c5162cd77ac4b74784196c4b51fa4762d754958fe079457

Contents?: true

Size: 1.51 KB

Versions: 2

Compression:

Stored size: 1.51 KB

Contents

#
# Runs many jobs in parallel, and returns their interleaved results.
# (NOTE: The JobRunner can be run multiple times; each time the blocks
#  will be executed again.)
#
# Examples:
#
# JobRunner.new do |jr|
#   jr.add { 3 }
#   jr.add { sleep 0.1; 2 }
#   jr.add { sleep 0.2; 1 }
#
#   jr.each_result do |result|
#     p result
#   end
# end
#
# jr = JobRunner.new(
#   proc { 1 },
#   proc { 2 },
#   proc { 3 }
# )
#
# 2.times do
#   jr.each_result { |result| p result }
# end
#
class JobRunner
  def initialize(*blocks)
    @threads = []
    @results = Thread::Queue.new
    @jobs    = []
    @started = false

    if blocks.any?
      blocks.each { |block| add &block }
    else
      yield self if block_given?
    end
  end

  def add(&block)
    @jobs << block
  end

  def reap!
    @threads.delete_if { |t| not t.alive? } if @threads.any?
  end

  def go!
    raise "Error: already started" if @started
    @started = true
    @jobs.each do |job|
      @threads << Thread.new do
        @results << job.call
      end
    end
  end

  def each_result
    go! unless @started

    loop do
      yield @results.pop
      reap!
      break if @threads.empty? and @results.empty?
    end

    @started = false
  end
end


if __FILE__ == $0
  JobRunner.new do |jr|
    jr.add { 3 }
    jr.add { sleep 0.1; 2 }
    jr.add { sleep 0.2; 1 }

    jr.each_result do |result|
      p result
    end
  end

  puts

  jr = JobRunner.new(
    proc { 1 },
    proc { 2 },
    proc { 3 }
  )

  2.times do
    jr.each_result { |r| p r }
    puts
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
epitools-0.5.130 lib/epitools/job_runner.rb
epitools-0.5.129 lib/epitools/job_runner.rb