Sha256: fb024d898ec5c0c16292a4574ac823803b904f6723e6e0e12651f6625fbe216f

Contents?: true

Size: 1.01 KB

Versions: 4

Compression:

Stored size: 1.01 KB

Contents

require 'thread'

class InThreads
  class Filler
    class Extractor
      include Enumerable

      def initialize(filler)
        @filler = filler
        @queue = []
      end

      def push(o)
        @queue.push(o)
      end

      def each
        begin
          loop do
            while @filler.synchronize{ @queue.empty? }
              @filler.run
            end
            yield @filler.synchronize{ @queue.shift }
          end
        rescue ThreadError => e
        end
        nil # non reusable
      end
    end

    attr_reader :extractors
    def initialize(enum, extractor_count)
      @extractors = Array.new(extractor_count){ Extractor.new(self) }
      @mutex = Mutex.new
      @filler = Thread.new do
        enum.each do |o|
          synchronize do
            @extractors.each do |extractor|
              extractor.push(o)
            end
          end
        end
      end
    end

    def run
      @filler.run
    end

    def synchronize(&block)
      @mutex.synchronize(&block)
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
in_threads-1.2.2 lib/in_threads/filler.rb
in_threads-1.2.1 lib/in_threads/filler.rb
in_threads-1.2.0 lib/in_threads/filler.rb
in_threads-1.1.2 lib/in_threads/filler.rb