Sha256: 6aca35f88395888430a7ecd5bc930bc4e2e81d84ccbbaa81cd11489ee55d778f

Contents?: true

Size: 727 Bytes

Versions: 2

Compression:

Stored size: 727 Bytes

Contents

# -*- encoding: binary -*-
require 'thread'

module Rainbows

  # Thread pool class based on pulling off a single Ruby Queue.
  # This is NOT used for the ThreadPool class, since that class does not
  # need a userspace Queue.
  class QueuePool < Struct.new(:queue, :threads)
    G = Rainbows::G

    def initialize(size = 20, &block)
      q = Queue.new
      self.threads = (1..size).map do
        Thread.new do
          while job = q.shift
            block.call(job)
          end
        end
      end
      self.queue = q
    end

    def quit!
      threads.each { |_| queue << nil }
      threads.delete_if do |t|
        G.tick
        t.alive? ? t.join(0.01) : true
      end until threads.empty?
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rainbows-0.94.0 lib/rainbows/queue_pool.rb
rainbows-0.93.0 lib/rainbows/queue_pool.rb