Sha256: d10ffefe3c8dc3d03fd41a4a6e52dac7021e7538f47e9b1127e2c7b8c62f6c35

Contents?: true

Size: 1.47 KB

Versions: 7

Compression:

Stored size: 1.47 KB

Contents

# frozen_string_literal: true

module Mutant
  module Parallel
    # Driver for parallelized execution
    class Driver
      include Anima.new(
        :threads,
        :var_active_jobs,
        :var_final,
        :var_running,
        :var_sink,
        :var_source,
        :workers
      )

      private(*anima.attribute_names)

      def initialize(**attributes)
        @alive = true
        super
      end

      # Wait for computation to finish, with timeout
      #
      # @param [Float] timeout
      #
      # @return [Variable::Result<Sink#status>]
      #   current status
      def wait_timeout(timeout)
        var_final.take_timeout(timeout) if @alive

        finalize(status)
      end

      # Stop parallel computation
      #
      # This will cause all work to be immediately stopped.
      #
      # @return [self]
      def stop
        @alive = false
        threads.each(&:kill)
        self
      end

    private

      def finalize(status)
        status.tap do
          if status.done?
            workers.each(&:join)
            threads.each(&:join)
          end
        end
      end

      def status
        var_active_jobs.with do |active_jobs|
          var_sink.with do |sink|
            Status.new(
              active_jobs: active_jobs.dup.freeze,
              done:        threads.all? { |worker| !worker.alive? },
              payload:     sink.status
            )
          end
        end
      end
    end # Driver
  end # Parallel
end # Mutant

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
mutant-0.11.16 lib/mutant/parallel/driver.rb
mutant-0.11.15 lib/mutant/parallel/driver.rb
mutant-0.11.14 lib/mutant/parallel/driver.rb
mutant-0.11.13 lib/mutant/parallel/driver.rb
mutant-0.11.12 lib/mutant/parallel/driver.rb
mutant-0.11.11 lib/mutant/parallel/driver.rb
mutant-0.11.10 lib/mutant/parallel/driver.rb