Sha256: b945b0c875d88a6ef662af0a09c2ff26785f9d27b22e55a3b71a51747393c028

Contents?: true

Size: 940 Bytes

Versions: 4

Compression:

Stored size: 940 Bytes

Contents

module Streamingly
  class Reducer

    def initialize(accumulator_class, accumulator_options=nil)
      @accumulator_class = accumulator_class
      @accumulator_options = accumulator_options
    end

    def reduce_over(enumerator)
      enumerator.each do |line|
        reduce(line.strip).each do |out|
          yield out
        end
      end

      flush.each do |out|
        yield out
      end
    end

  private

    def flush
      @accumulator ? @accumulator.flush : []
    end

    def reduce(line)
      key, value = line.split("\t")

      if @prev_key != key
        results = flush

        @prev_key = key
        @accumulator = new_accumulator(key)
      end

      @accumulator.apply_value(value)

      results || []
    end

    def new_accumulator(key)
      if @accumulator_options
        @accumulator_class.new(key, @accumulator_options)
      else
        @accumulator_class.new(key)
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
streamingly-0.0.5 lib/streamingly/reducer.rb
streamingly-0.0.4 lib/streamingly/reducer.rb
streamingly-0.0.3.pre lib/streamingly/reducer.rb
streamingly-0.0.2 lib/streamingly/reducer.rb