Sha256: a2c3519ab6c014987474973bcb52c9b94ea759e874217fad131a4cd6ea263b53

Contents?: true

Size: 1.65 KB

Versions: 5

Compression:

Stored size: 1.65 KB

Contents

module Metacrunch
  class Parallel

    module DSL
      def parallel(enumerable, options = {}, &block)
        Parallel.each(enumerable, options, &block)
      end
    end

    def self.each(enumerable, options = {}, &block)
      self.new(enumerable, options, &block).call
    end

    def initialize(enumerable, options = {}, &block)
      @enumerable          = enumerable
      @callable            = block
      @no_of_procs         = options[:in_processes] || 0
      @on_process_finished = options[:on_process_finished] || -> {}

      unless block_given?
        raise ArgumentError, "you must provide a block"
      end

      unless @enumerable.respond_to?(:each)
        raise ArgumentError, "enumerable must respond to each"
      end

      unless @on_process_finished.respond_to?(:call)
        raise ArgumentError, "on_process_finished must respond to call"
      end
    end

    def call
      @enumerable.each do |_value|
        if @no_of_procs == 0
          @callable.call(_value)
          @on_process_finished.call
        else
          fork_process do
            @callable.call(_value)
          end

          if processes_limit_reached?
            wait_for_some_process_to_terminate
            @on_process_finished.call
          end
        end
      end
    ensure
      Process.waitall.each { @on_process_finished.call }
    end

  private

    def fork_process(&block)
      (@pids ||= []).push(fork(&block))
    end

    def processes_limit_reached?
      (@pids || []).length >= @no_of_procs
    end

    def wait_for_some_process_to_terminate
      pid_of_finished_process = Process.wait
      @pids.delete(pid_of_finished_process)
    end

  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
metacrunch-2.2.3 lib/metacrunch/parallel.rb
metacrunch-2.2.2 lib/metacrunch/parallel.rb
metacrunch-2.2.1 lib/metacrunch/parallel.rb
metacrunch-2.2.0 lib/metacrunch/parallel.rb
metacrunch-2.1.1 lib/metacrunch/parallel.rb