Sha256: c407b1c45b3bd384077d7cb21ecfe9cad5c275398c97b42fdd9eef14ce756d22

Contents?: true

Size: 1.96 KB

Versions: 2

Compression:

Stored size: 1.96 KB

Contents

module Kraps
  class Job
    attr_reader :steps

    def initialize(worker:)
      @worker = worker
      @steps = []
      @partitions = 0
      @partitioner = MapReduce::HashPartitioner.new(@partitions)
    end

    def parallelize(partitions:, partitioner: MapReduce::HashPartitioner.new(partitions), worker: @worker, &block)
      fresh.tap do |job|
        job.instance_eval do
          @partitions = partitions
          @partitioner = partitioner

          @steps << Step.new(action: Actions::PARALLELIZE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
        end
      end
    end

    def map(partitions: nil, partitioner: nil, worker: @worker, &block)
      fresh.tap do |job|
        job.instance_eval do
          @partitions = partitions if partitions
          @partitioner = partitioner || MapReduce::HashPartitioner.new(partitions) if partitioner || partitions

          @steps << Step.new(action: Actions::MAP, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
        end
      end
    end

    def reduce(worker: @worker, &block)
      fresh.tap do |job|
        job.instance_eval do
          @steps << Step.new(action: Actions::REDUCE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
        end
      end
    end

    def each_partition(worker: @worker, &block)
      fresh.tap do |job|
        job.instance_eval do
          @steps << Step.new(action: Actions::EACH_PARTITION, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
        end
      end
    end

    def repartition(partitions:, partitioner: nil, worker: @worker)
      map(partitions: partitions, partitioner: partitioner, worker: worker) do |key, value, collector|
        collector.call(key, value)
      end
    end

    def fresh
      dup.tap do |job|
        job.instance_variable_set(:@steps, @steps.dup)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
kraps-0.2.0 lib/kraps/job.rb
kraps-0.1.0 lib/kraps/job.rb