Sha256: 9d2ac4c7980f17769d0eb46df7d18a54391dbb67ed4d2106e9b396edc6c9ebaf

Contents?: true

Size: 1.35 KB

Versions: 4

Compression:

Stored size: 1.35 KB

Contents

require 'hadoop_dsl'
require 'enumerator'

module HadoopDsl::WordCount
  MODEL_METHODS = []
  TOTAL_PREFIX = "\t"

  # controller
  class WordCountMapper < HadoopDsl::BaseMapper
    def initialize(script, key, value)
      super(script, WordCountMapperModel.new(key, value))
    end

    # model methods
    def_delegators :@model, *MODEL_METHODS

    # emitters
    def count_uniq
      @model.value.split.each {|word| emit(word => 1)}
    end

    def total(*types)
      types.each do |type|
        case type
        when :bytes
          emit("#{TOTAL_PREFIX}total bytes" => @model.value.gsub(/\s/, '').length)
        when :words
          emit("#{TOTAL_PREFIX}total words" => @model.value.split.size)
        when :lines
          emit("#{TOTAL_PREFIX}total lines" => 1)
        end
      end
    end
  end

  class WordCountReducer < HadoopDsl::BaseReducer
    def initialize(script, key, values)
      super(script, WordCountReducerModel.new(key, values))
    end

    # model methods
    def_delegators :@model, *MODEL_METHODS

    # emitters
    def count_uniq; aggregate unless @model.total_value? end
    def total(*types); aggregate if @model.total_value? end
  end

  # model
  class WordCountMapperModel < HadoopDsl::BaseMapperModel
  end

  class WordCountReducerModel < HadoopDsl::BaseReducerModel
    def total_value?; @key =~ /^#{TOTAL_PREFIX}/ end
  end
end

Version data entries

4 entries across 4 versions & 2 rubygems

Version Path
hadoop-rubydsl-0.0.6 lib/word_count.rb
hadoop-papyrus-0.0.6 lib/word_count.rb
hadoop-rubydsl-0.0.5 lib/word_count.rb
hadoop-rubydsl-0.0.4 lib/word_count.rb