require 'rbbt/util/concurrency/processes/worker' require 'rbbt/util/concurrency/processes/socket' class RbbtProcessQueue attr_accessor :num_processes, :processes, :queue, :process_monitor, :cleanup, :join def initialize(num_processes, cleanup = nil, join = nil) @num_processes = num_processes @processes = [] @cleanup = cleanup @join = join @queue = RbbtProcessSocket.new end attr_accessor :callback, :callback_queue, :callback_thread def callback(&block) if block_given? @callback = block @callback_queue = RbbtProcessSocket.new @callback_thread = Thread.new(Thread.current) do |parent| begin loop do p = @callback_queue.pop raise p if Exception === p raise p.first if Array === p and Exception === p.first @callback.call p end rescue Aborted Log.warn "Callback thread aborted" @process_monitor.raise Aborted.new raise $! rescue ClosedStream rescue Exception Log.warn "Callback thread exception: #{$!.message}" @process_monitor.raise $! raise $! ensure @callback_queue.sread.close unless @callback_queue.sread.closed? end end else @callback, @callback_queue, @callback_thread = nil, nil, nil end end def init(&block) num_processes.times do |i| @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, &block) end @queue.close_read @process_monitor = Thread.new(Thread.current) do |parent| begin while @processes.any? @processes[0].join @processes.shift end rescue Aborted Log.warn "Aborting process monitor" @processes.each{|p| p.abort } @processes.each{|p| p.join } rescue Exception Log.warn "Process monitor exception: #{$!.message}" @processes.each{|p| p.abort } @callback_thread.raise $! if @callback_thread and @callback_thread.alive? raise $! end end end def close_callback begin @callback_queue.push ClosedStream.new if @callback_thread.alive? rescue Exception Log.warn "Error closing callback: #{$!.message}" end @callback_thread.join #if @callback_thread.alive? end def join begin @processes.length.times do @queue.push ClosedStream.new end if @process_monitor.alive? rescue Exception end begin @process_monitor.join close_callback if @callback rescue Exception Log.error "Exception joining queue: #{$!.message}" raise $! ensure @queue.swrite.close unless @queue.swrite.closed? end @join.call if @join end def clean if @process_monitor.alive? or @callback_thread.alive? self.abort else self.join end end def abort begin @process_monitor.raise(Aborted.new); @process_monitor.join if @process_monitor and @process_monitor.alive? @callback_thread.raise(Aborted.new); @callback_thread.join if @callback_thread and @callback_thread.alive? ensure join end end def process(*e) begin @queue.push e rescue Errno::EPIPE raise Aborted end end def self.each(list, num = 3, &block) q = RbbtProcessQueue.new num q.init(&block) list.each do |elem| q.process elem end q.join end end