Sha256: cc68dc9265655d21c3baa3636b2ec6966665c0486944bb3c0364cbde169520d3
Contents?: true
Size: 1.2 KB
Versions: 9
Compression:
Stored size: 1.2 KB
Contents
# frozen_string_literal: true module Nanoc::Extra # @api private class ParallelCollection STOP = Object.new include Nanoc::Int::ContractsSupport contract C::RespondTo[:each], C::KeywordArgs[parallelism: Integer] => C::Any def initialize(enum, parallelism: 2) @enum = enum @parallelism = parallelism end contract C::Func[C::Any => C::Any] => self def each queue = SizedQueue.new(2 * @parallelism) error = nil threads = (1..@parallelism).map do Thread.new do loop do begin elem = queue.pop break if error break if STOP.equal?(elem) yield elem rescue => err error = err break end end end end @enum.each { |e| queue << e } @parallelism.times { queue << STOP } threads.each(&:join) raise error if error self end contract C::Func[C::Any => C::Any] => C::RespondTo[:each] def map [].tap do |all| mutex = Mutex.new each do |e| res = yield(e) mutex.synchronize { all << res } end end end end end
Version data entries
9 entries across 9 versions & 1 rubygems