Sha256: 0be71f2b5bc4c4c98ea52b9bb23c934ce4cf7c6019cb77ffb91fab49bfce6b0a

Contents?: true

Size: 1.68 KB

Versions: 26

Compression:

Stored size: 1.68 KB

Contents

class RbbtThreadQueue
  attr_accessor :num_threads, :threads, :queue, :mutex, :block, :done

  class RbbtThreadQueueWorker < Thread
    def initialize(queue, mutex = nil, &block)
      if mutex.nil?
        super(Thread.current) do |current|
          begin
            loop do
              p = queue.pop
              block.call *p
            end
          rescue Exception
            current.raise $! unless Aborted === $!
          end
        end
      else
        super(Thread.current) do |current|
          begin
            loop do
              p = queue.pop
              p = Array === p ? p << mutex : [p,mutex]
              block.call *p
            end
          rescue Exception
            current.raise $! unless Aborted === $!
          end
        end
      end
    end

    def clean
      raise Aborted if alive?
    end
  end

  def initialize(num_threads)
    @num_threads = num_threads
    @threads = []
    @queue = Queue.new
    @mutex = Mutex.new
  end

  def init(use_mutex = false, &block)
    clean
    num_threads.times do |i|
      @threads << RbbtThreadQueueWorker.new(queue, use_mutex ? mutex : nil, &block)
    end
  end

  def join
    while queue.length > 0 or queue.num_waiting < @threads.length
      Thread.pass 
      raise "No worker thread survived" if @threads.empty? and queue.length > 0
    end
    @threads.delete_if{|t| t.alive?}
    @threads.each{|t| t.raise Aborted } 
    @threads.each{|t| t.join(0.1) } 
  end

  def clean
    threads.each{ |t| t.clean }.clear
  end

  def process(e)
    queue << e
  end

  def self.each(list, num = 3, &block)
    q = RbbtThreadQueue.new num
    q.init(&block)
    list.each do |elem| q.process elem end
    q.join
  end
end

Version data entries

26 entries across 26 versions & 1 rubygems

Version Path
rbbt-util-5.12.3 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.12.2 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.12.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.12.0 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.9 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.8 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.7 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.6 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.5 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.4 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.3 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.2 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.11.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.10.2 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.10.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.9.12 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.9.11 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.9.10 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.9.8 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.9.7 lib/rbbt/util/concurrency/threads.rb