Sha256: 7c0f8d163d18b030f29128fabae4537037c91135184225990cfd9c4f16ab9150

Contents?: true

Size: 1.28 KB

Versions: 11

Compression:

Stored size: 1.28 KB

Contents

# frozen_string_literal: true

module Mutant
  module Parallel
    class Worker
      include Adamantium::Flat, Anima.new(
        :processor,
        :var_active_jobs,
        :var_final,
        :var_running,
        :var_sink,
        :var_source
      )

      private(*anima.attribute_names)

      # Run worker payload
      #
      # @return [self]
      #
      # ignore :reek:TooManyStatements
      def call
        loop do
          job = next_job or break

          job_start(job)

          result = processor.call(job.payload)

          job_done(job)

          break if add_result(result)
        end

        finalize

        self
      end

    private

      def next_job
        var_source.with do |source|
          source.next if source.next?
        end
      end

      def add_result(result)
        var_sink.with do |sink|
          sink.result(result)
          sink.stop?
        end
      end

      def job_start(job)
        var_active_jobs.with do |active_jobs|
          active_jobs << job
        end
      end

      def job_done(job)
        var_active_jobs.with do |active_jobs|
          active_jobs.delete(job)
        end
      end

      def finalize
        var_final.put(nil) if var_running.modify(&:pred).zero?
      end

    end # Worker
  end # Parallel
end # Mutant

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
mutant-0.10.6 lib/mutant/parallel/worker.rb
mutant-0.10.5 lib/mutant/parallel/worker.rb
mutant-0.10.4 lib/mutant/parallel/worker.rb
mutant-0.10.1 lib/mutant/parallel/worker.rb
mutant-0.10.0 lib/mutant/parallel/worker.rb
mutant-0.9.14 lib/mutant/parallel/worker.rb
mutant-0.9.13 lib/mutant/parallel/worker.rb
mutant-0.9.12 lib/mutant/parallel/worker.rb
mutant-0.9.11 lib/mutant/parallel/worker.rb
mutant-0.9.10 lib/mutant/parallel/worker.rb
mutant-0.9.9 lib/mutant/parallel/worker.rb