Sha256: c4570a36fbf1905d58840d12abeb02f3cd69b98e927a66c170fc76fba03ccdfe

Contents?: true

Size: 1.65 KB

Versions: 10

Compression:

Stored size: 1.65 KB

Contents

class RbbtThreadQueue
  attr_accessor :num_threads, :threads, :queue, :mutex, :block, :done

  class RbbtThreadQueueWorker < Thread
    def initialize(queue, mutex = nil, &block)
      if mutex.nil?
        super(Thread.current) do |current|
          begin
            loop do
              p = queue.pop
              block.call *p
            end
          rescue Exception
            current.raise $! unless Aborted === $!
          end
        end
      else
        super(Thread.current) do |current|
          begin
            loop do
              p = queue.pop
              p << mutex 
              block.call *p
            end
          rescue Exception
            current.raise $! unless Aborted === $!
          end
        end
      end
    end

    def clean
      raise Aborted if alive?
    end
  end

  def initialize(num_threads)
    @num_threads = num_threads
    @threads = []
    @queue = Queue.new
    @mutex = Mutex.new
  end

  def init(use_mutex = false, &block)
    clean
    num_threads.times do |i|
      @threads << RbbtThreadQueueWorker.new(queue, use_mutex ? mutex : nil, &block)
    end
  end

  def join
    while queue.length > 0 or queue.num_waiting < @threads.length
      Thread.pass 
      raise "No worker thread survived" if @threads.empty? and queue.length > 0
    end
    @threads.delete_if{|t| t.alive?}
    @threads.each{|t| t.raise Aborted } 
    @threads.each{|t| t.join(0.1) } 
  end

  def clean
    threads.each{ |t| t.clean }.clear
  end

  def process(e)
    queue << e
  end


  def self.each(list, num = 3, &block)
    q = RbbtThreadQueue.new num
    q.init(&block)
    list.each do |elem| q.process elem end
    q.join
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
rbbt-util-5.8.10 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.9 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.8 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.7 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.6 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.4 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.3 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.2 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.1 lib/rbbt/util/concurrency/threads.rb
rbbt-util-5.8.0 lib/rbbt/util/concurrency/threads.rb