Sha256: 27d1b12df1e001f55c8c17c02be4856f89985bda8ecee507a6c5d36b74f1f1bb

Contents?: true

Size: 805 Bytes

Versions: 4

Compression:

Stored size: 805 Bytes

Contents

module Wukong
  module Streamer
    #
    # Roll up all records from a given key into a single list.
    #
    class ListReducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :values

      # start with an empty list
      def start! *args
        @values = []
      end

      # aggregate all records.
      # note that this accumulates the full *record* -- key, value, everything.
      def accumulate *record
        @values << record
      end

      # emit the key and all records, tab-separated
      #
      # you will almost certainly want to override this method to do something
      # interesting with the values (or override accumulate to gather scalar
      # values)
      #
      def finalize
        yield [key, @values.to_flat.join(";")].flatten
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
wukong-3.0.0.pre old/wukong/streamer/list_reducer.rb
wukong-2.0.2 lib/wukong/streamer/list_reducer.rb
wukong-2.0.1 lib/wukong/streamer/list_reducer.rb
wukong-2.0.0 lib/wukong/streamer/list_reducer.rb