Sha256: b945b0c875d88a6ef662af0a09c2ff26785f9d27b22e55a3b71a51747393c028
Contents?: true
Size: 940 Bytes
Versions: 4
Compression:
Stored size: 940 Bytes
Contents
module Streamingly class Reducer def initialize(accumulator_class, accumulator_options=nil) @accumulator_class = accumulator_class @accumulator_options = accumulator_options end def reduce_over(enumerator) enumerator.each do |line| reduce(line.strip).each do |out| yield out end end flush.each do |out| yield out end end private def flush @accumulator ? @accumulator.flush : [] end def reduce(line) key, value = line.split("\t") if @prev_key != key results = flush @prev_key = key @accumulator = new_accumulator(key) end @accumulator.apply_value(value) results || [] end def new_accumulator(key) if @accumulator_options @accumulator_class.new(key, @accumulator_options) else @accumulator_class.new(key) end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
streamingly-0.0.5 | lib/streamingly/reducer.rb |
streamingly-0.0.4 | lib/streamingly/reducer.rb |
streamingly-0.0.3.pre | lib/streamingly/reducer.rb |
streamingly-0.0.2 | lib/streamingly/reducer.rb |