Sha256: d095ab2091a2b9b08c3a3798b559d00fe6f6e311513cd3b4f63004a29c6e0cec
Contents?: true
Size: 1.92 KB
Versions: 1
Compression:
Stored size: 1.92 KB
Contents
require_relative 'work_queue/socket' require_relative 'work_queue/worker' class WorkQueue attr_accessor :workers, :worker_proc, :callback def initialize(workers = 0, &block) @input = WorkQueue::Socket.new @output = WorkQueue::Socket.new @workers = workers.times.collect{ Worker.new } @worker_proc = block @worker_mutex = Mutex.new @removed_workers = [] end def add_worker(&block) worker = Worker.new @worker_mutex.synchronize do @workers.push(worker) if block_given? worker.process @input, @output, &block else worker.process @input, @output, &@worker_proc end end worker end def ignore_ouput @workers.each{|w| w.ignore_ouput = true } end def remove_one_worker @input.write DoneProcessing.new end def remove_worker(pid) worker = @worker_mutex.synchronize do Log.debug "Remove #{pid}" @removed_workers.concat(@workers.delete_if{|w| w.pid == pid }) end end def process(&callback) @workers.each do |w| w.process @input, @output, &@worker_proc end @reader = Thread.new do begin while true obj = @output.read if DoneProcessing === obj remove_worker obj.pid if obj.pid else callback.call obj if callback end end rescue Aborted end end if @output end def write(obj) @input.write obj end def close while @worker_mutex.synchronize{ @workers.length } > 0 begin @input.write DoneProcessing.new pid = Process.wait status = $? worker = @worker_mutex.synchronize{ @removed_workers.delete_if{|w| w.pid == pid }.first } worker.exit $?.exitstatus if worker rescue Errno::ECHILD Thread.pass until @workers.length == 0 break end end @reader.raise Aborted if @reader end def join @reader.join if @reader end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
scout-gear-6.0.0 | lib/scout/work_queue.rb |