Sha256: c4c760039efd28c737ab8d49e1d948461d6718e2f362a9019b61f426819ad44c
Contents?: true
Size: 1.48 KB
Versions: 1
Compression:
Stored size: 1.48 KB
Contents
require "thread" # # Usage: # # puts "Creating task queue with 5 concurrent workers" # tasks = ParallelTasks.new(5) { puts "Worker thread starting up" } # # puts "Starting workers" # tasks.start # # puts "Making some work" # 20.times do # tasks.add do # x = rand(5) # puts "Sleeping for #{x}s" # sleep x # end # end # # puts "Waiting for workers to finish" # tasks.finish # # puts "Done" # class ParallelTasks def initialize(num_workers, &initialization_proc) @num_workers = num_workers @initialization_proc = initialization_proc @workers = [] @work_queue = Queue.new @semaphore = Mutex.new @done_making_tasks = false end def start @num_workers.times do @workers << Thread.new do @initialization_proc.call if @initialization_proc done = false while !done task = nil @semaphore.synchronize do unless @work_queue.empty? task = @work_queue.pop() else if @done_making_tasks done = true else sleep 0.1 end end end if task begin task.call rescue Exception => e log.error e.inspect task.call end end end end end end def add(&proc) @work_queue << proc end def finish @done_making_tasks = true @workers.each {|t| t.join } end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dbox-0.6.5 | lib/dbox/parallel_tasks.rb |