Sha256: c1370b4a57b109aef9e754dd02381aa9cc5929e5627bf7addd0dfe7efdfc9d8e

Contents?: true

Size: 1.68 KB

Versions: 5

Compression:

Stored size: 1.68 KB

Contents

class RbbtThreadQueue
  attr_accessor :num_threads, :threads, :queue, :mutex, :block, :done

  class RbbtThreadQueueWorker < Thread
    def initialize(queue, mutex = nil, &block)
      if mutex.nil?
        super(Thread.current) do |parent|
          begin
            loop do
              p = queue.pop
              block.call *p
            end
          rescue Aborted
          rescue Exception
            parent.raise $! 
          end
        end
      else
        super(Thread.current) do |parent|
          begin
            loop do
              p = queue.pop
              p = Array === p ? p << mutex : [p,mutex]
              block.call *p
            end
          rescue Aborted
          rescue Exception
            parent.raise $! 
          end
        end
      end
    end

    def clean
      raise Aborted if alive?
    end
  end

  def initialize(num_threads)
    @num_threads = num_threads
    @threads = []
    @queue = Queue.new
    @mutex = Mutex.new
  end

  def init(use_mutex = false, &block)
    clean
    num_threads.times do |i|
      @threads << RbbtThreadQueueWorker.new(queue, use_mutex ? mutex : nil, &block)
    end
  end

  def join
    while queue.length > 0 or queue.num_waiting < @threads.length
      Thread.pass 
      raise "No worker thread survived" if @threads.empty? and queue.length > 0
    end
    @threads.delete_if{|t| t.alive?}
    @threads.each{|t| t.raise Aborted } 
    @threads.each{|t| t.join(0.1) } 
  end

  def clean
    threads.each{ |t| t.clean }.clear
  end

  def process(e)
    queue << e
  end

  def self.each(list, num = 3, &block)
    q = RbbtThreadQueue.new num
    q.init(&block)
    list.each do |elem| q.process elem end
    q.join
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
rbbt-util-5.13.4 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.13.3 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.13.2 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.13.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.13.0 lib/rbbt/util/concurrency/threads.rb