Sha256: cdc3bd701a8dcbce87cb63556e17900c906b25459e8af0087aef9fb1fc0efedd

Contents?: true

Size: 1.18 KB

Versions: 5

Compression:

Stored size: 1.18 KB

Contents

require 'thread'

module Nanoc::Extra
  # @api private
  class ParallelCollection
    STOP = Object.new

    include Nanoc::Int::ContractsSupport

    contract C::RespondTo[:each], C::KeywordArgs[parallelism: Fixnum] => C::Any
    def initialize(enum, parallelism: 2)
      @enum = enum
      @parallelism = parallelism
    end

    contract C::Func[C::Any => C::Any] => self
    def each
      queue = SizedQueue.new(2 * @parallelism)
      error = nil

      threads = (1..@parallelism).map do
        Thread.new do
          loop do
            begin
              elem = queue.pop
              break if error
              break if STOP.equal?(elem)
              yield elem
            rescue => err
              error = err
              break
            end
          end
        end
      end

      @enum.each { |e| queue << e }
      @parallelism.times { queue << STOP }

      threads.each(&:join)

      raise error if error
      self
    end

    contract C::Func[C::Any => C::Any] => C::RespondTo[:each]
    def map
      [].tap do |all|
        mutex = Mutex.new
        each do |e|
          res = yield(e)
          mutex.synchronize { all << res }
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
nanoc-4.5.2 lib/nanoc/extra/parallel_collection.rb
nanoc-4.5.1 lib/nanoc/extra/parallel_collection.rb
nanoc-4.5.0 lib/nanoc/extra/parallel_collection.rb
nanoc-4.4.7 lib/nanoc/extra/parallel_collection.rb
nanoc-4.4.6 lib/nanoc/extra/parallel_collection.rb