Sha256: 6204174093b4687eb807fa0b9209dea8e2b6424463cdae3edaa5a23ba581f3d2

Contents?: true

Size: 1.62 KB

Versions: 773

Compression:

Stored size: 1.62 KB

Contents

class RbbtThreadQueue
  attr_accessor :num_threads, :threads, :queue, :mutex, :block, :done

  class RbbtThreadQueueWorker < Thread
    def initialize(queue, mutex = nil, &block)
      if mutex.nil?
        super(Thread.current) do |parent|
          begin
            loop do
              p = queue.pop
              block.call *p
            end
          rescue Exception
            raise $!
          end
        end
      else
        super(Thread.current) do |parent|
          begin
            loop do
              p = queue.pop
              p = Array === p ? p << mutex : [p,mutex]
              block.call *p
            end
          rescue Exception
            raise $!
          end
        end
      end
    end

    def clean
      raise Aborted if alive?
    end
  end

  def initialize(num_threads)
    @num_threads = num_threads
    @threads = []
    @queue = Queue.new
    @mutex = Mutex.new
  end

  def init(use_mutex = false, &block)
    clean
    num_threads.times do |i|
      @threads << RbbtThreadQueueWorker.new(queue, use_mutex ? mutex : nil, &block)
    end
  end

  def join
    while queue.length > 0 or queue.num_waiting < @threads.length
      Thread.pass 
      raise "No worker thread survived" if @threads.empty? and queue.length > 0
    end
    @threads.delete_if{|t| t.alive?}
    @threads.each{|t| t.raise Aborted } 
    @threads.each{|t| t.join(0.1) } 
  end

  def clean
    threads.each{ |t| t.clean }.clear
  end

  def process(e)
    queue << e
  end

  def self.each(list, num = 3, &block)
    q = RbbtThreadQueue.new num
    q.init(&block)
    list.each do |elem| q.process elem end
    q.join
  end
end

Version data entries

773 entries across 773 versions & 1 rubygems

Version Path
rbbt-util-5.44.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.43.0 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.42.0 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.41.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.41.0 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.40.5 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.40.4 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.40.3 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.40.0 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.39.0 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.38.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.38.0 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.16 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.15 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.14 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.13 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.12 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.11 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.10 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.37.9 lib/rbbt/util/concurrency/threads.rb