Sha256: ff5d440cbc2952b6d35ff65c92c3282fb74df88a9b6d27c1c6886d6ffeb9ccb3
Contents?: true
Size: 693 Bytes
Versions: 22
Compression:
Stored size: 693 Bytes
Contents
module Wukong module Streamer # # Emit each unique key and the count of its occurrences # class SummingReducer < Wukong::Streamer::AccumulatingReducer attr_accessor :summing_elements attr_accessor :sums # reset the counter to zero def start! *args self.sums = summing_elements.map{ 0 } end # record one more for this key def accumulate *fields vals = fields.values_at( *summing_elements ) vals.each_with_index{|val,idx| self.sums[idx] += val.to_i } end # emit each key field and the count, tab-separated. def finalize yield [key, sums].flatten end end end end
Version data entries
22 entries across 22 versions & 2 rubygems