Sha256: 01ecd7c54853b0d1fe58a08219cf4282835f1ef30df4b35efac271efd2398757

Contents?: true

Size: 1.25 KB

Versions: 20

Compression:

Stored size: 1.25 KB

Contents

module Wukong
  module Streamer
    #
    # UniqByLastReducer accepts all records for a given key and emits only the
    # last-seen.
    #
    # It acts like an insecure high-school kid: for each record of a given key
    # it discards whatever record it's holding and adopts this new value. When a
    # new key comes on the scene it emits the last record, like an older brother
    # handing off his Depeche Mode collection.
    #
    # For example, to extract the *latest* value for each property, emit your
    # records as
    #
    #    [resource_type, key, timestamp, ... fields ...]
    #
    # then set :sort_fields to 3 and :partition_fields to 2.
    #
    class UniqByLastReducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :final_value

      #
      # Use first two fields as keys by default
      #
      def get_key *vals
        vals[0..1]
      end

      #
      # Adopt each value in turn: the last one's the one you want.
      #
      def accumulate *vals
        self.final_value = vals
      end

      #
      # Emit the last-seen value
      #
      def finalize
        yield final_value if final_value
      end

      #
      # Clear state on reset
      #
      def start! *args
        self.final_value = nil
      end
    end
  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
wukong-3.0.0.pre old/wukong/streamer/uniq_by_last_reducer.rb
wukong-2.0.2 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-2.0.1 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-2.0.0 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.5.4 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.5.3 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.5.2 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.5.1 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.5.0 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.12 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.11 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.10 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.9 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.7 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.6 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.5 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.2 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.1 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-1.4.0 lib/wukong/streamer/uniq_by_last_reducer.rb
wukong-0.1.4 lib/wukong/streamer/uniq_by_last_reducer.rb