Sha256: 61b52c823b493da75f6d48a980cc872a6de6bd2ed81be02375123a3ec9fcc790

Contents?: true

Size: 1.01 KB

Versions: 2

Compression:

Stored size: 1.01 KB

Contents

module Wukong
  module Streamer
    #
    # Accumulate acts like an insecure high-school kid, for each key adopting in
    # turn the latest value seen. It then emits the last (in sort order) value
    # for that key.
    #
    # For example, to extract the *latest* value for each property, set hadoop
    # to use <resource, item_id, timestamp> as sort fields and <resource,
    # item_id> as key fields.
    #
    class UniqByLastReducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :final_value

      #
      # Use first two fields as keys by default
      #
      def get_key *vals
        vals[0..1]
      end

      #
      # Adopt each value in turn: the last one's the one you want.
      #
      def accumulate *vals
        self.final_value = vals
      end

      #
      # Emit the last-seen value
      #
      def finalize
        yield final_value if final_value
      end

      #
      # Clear state on reset
      #
      def start! *args
        self.final_value = nil
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
mrflip-wukong-0.1.0 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-0.1.1 lib/wukong/streamer/uniq_by_last_reducer.rb