Sha256: 1cd9f03fe1a550f9c8241bed26b9b4ea387cbf5b7fef309794689c057703b969
Contents?: true
Size: 1.67 KB
Versions: 3
Compression:
Stored size: 1.67 KB
Contents
require 'core' require 'enumerator' module HadoopDsl::WordCount include HadoopDsl AVAILABLE_METHODS = [:count_uniq, :total] TOTAL_PREFIX = "\t" # common module WordCountMapRed # entry point def data(description = '', &block) yield end end # controller class WordCountMapper < BaseMapper def initialize(script, key, value) super(script, WordCountMapperModel.new(key, value)) end include WordCountMapRed # model methods def_delegators :@model, *AVAILABLE_METHODS end class WordCountReducer < BaseReducer def initialize(script, key, values) super(script, WordCountReducerModel.new(key, values)) end include WordCountMapRed # model methods def_delegators :@model, *AVAILABLE_METHODS end # model class WordCountMapperModel < BaseMapperModel def initialize(key, value) super(key, value) end # emitters def count_uniq @value.split.each {|word| @controller.emit(word => 1)} end def total(*types) types.each do |type| case type when :bytes @controller.emit("#{TOTAL_PREFIX}total bytes" => @value.gsub(/\s/, '').length) when :words @controller.emit("#{TOTAL_PREFIX}total words" => @value.split.size) when :lines @controller.emit("#{TOTAL_PREFIX}total lines" => 1) end end end end class WordCountReducerModel < BaseReducerModel def initialize(key, values) super(key, values) end # emitters def count_uniq; aggregate unless total_value? end def total(*types); aggregate if total_value? end private def total_value?; @key =~ /^#{TOTAL_PREFIX}/ end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
hadoop-rubydsl-0.0.3 | lib/word_count.rb |
hadoop-rubydsl-0.0.2 | lib/word_count.rb |
hadoop-rubydsl-0.0.1 | lib/word_count.rb |