lib/log_analysis.rb in hadoop-rubydsl-0.0.3 vs lib/log_analysis.rb in hadoop-rubydsl-0.0.4

- old
+ new

@@ -1,107 +1,146 @@ -require 'core' +require 'hadoop_dsl' require 'enumerator' module HadoopDsl::LogAnalysis - include HadoopDsl - KEY_SEP = "\t" PREFIX = 'col' PASS = nil - AVAILABLE_METHODS = [:separate, :pattern, :column_name, :column, :topic, :value, :count_uniq, :sum] + MODEL_METHODS = [:column, :value] - # common - module LogAnalysisMapRed - # entry point - def data(description = '', &block) yield end + # controller + class LogAnalysisMapper < HadoopDsl::BaseMapper + def initialize(script, key, value) + super(script, LogAnalysisMapperModel.new(key, value)) + end - def each_line(&block) yield end - end + # model methods + def_delegators :@model, *MODEL_METHODS + + def topic(desc, options = {}, &block) + @model.create_topic(desc, options) + yield if block_given? + current_topic + end - # controller - class LogAnalysisSetup < BaseSetup - def initialize(script, conf) - super(script, conf) + def separate(sep) + parts = value.split(sep) + @model.create_or_replace_columns_with(parts) {|column, value| column.value = value} end - include LogAnalysisMapRed - end + def pattern(re) + if value =~ re + md = Regexp.last_match + @model.create_or_replace_columns_with(md.captures) {|column, value| column.value = value} + end + end - class LogAnalysisMapper < BaseMapper - def initialize(script, key, value) - super(script, LogAnalysisMapperModel.new(key, value)) + # column names by String converted to Symbol + def column_name(*names) + sym_names = names.map {|name| name.is_a?(String) ? name.to_sym : name } + @model.create_or_replace_columns_with(sym_names) {|column, name| column.name = name} end - include LogAnalysisMapRed + def group_by(column_or_value) + case column_or_value + when LogAnalysisMapperModel::Column + column = column_or_value + current_topic.key_elements << column.value + else + value = column_or_value + current_topic.key_elements << value + end + end - # model methods - def_delegators :@model, *AVAILABLE_METHODS + def group_date_by(column, term) + require 'time' + time = parse_time(column.value) + time_key = case term + when :daily then time.strftime('%Y%m%d') + when :monthly then time.strftime('%Y%m') + when :yearly then time.strftime('%Y') + end + current_topic.key_elements << time_key + end + + # emitters + def count_uniq(column_or_value) + uniq_key = + case column_or_value + when LogAnalysisMapperModel::Column + column = column_or_value + column.value + else column_or_value # value + end + current_topic.key_elements << uniq_key + emit(current_topic.key => 1) + end + + def sum(column) + emit(current_topic.key => column.value.to_i) + end + + private + def current_topic; @model.current_topic end + + def parse_time(str) + begin Time.parse(str) + rescue + # apachelog pattern ex) "10/Oct/2000:13:55:36 -0700" + Time.parse($1) if str =~ /^(\d*\/\w*\/\d*):/ + end + end end - class LogAnalysisReducer < BaseReducer + class LogAnalysisReducer < HadoopDsl::BaseReducer def initialize(script, key, values) super(script, LogAnalysisReducerModel.new(key, values)) end - include LogAnalysisMapRed - # model methods - def_delegators :@model, *AVAILABLE_METHODS + def_delegators :@model, *MODEL_METHODS + + def topic(desc, options = {}, &block) + @model.create_topic(desc, options) + yield if block_given? + @model.current_topic + end + + def count_uniq(column) + aggregate if @model.topic == @model.current_topic + end + + def sum(column) + aggregate if @model.topic == @model.current_topic + end end # model - class LogAnalysisMapperModel < BaseMapperModel + class LogAnalysisMapperModel < HadoopDsl::BaseMapperModel + attr_reader :current_topic + def initialize(key, value) super(key, value) @columns = ColumnArray.new @topics = [] end def column; @columns end - def topic(desc, options = {}, &block) + def create_topic(desc, options) @topics << @current_topic = Topic.new(desc, options[:label]) - yield if block_given? - @current_topic end - def separate(sep) - parts = @value.split(sep) - create_or_replace_columns_with(parts) {|column, value| column.value = value} - end - - def pattern(re) - if @value =~ re - md = Regexp.last_match - create_or_replace_columns_with(md.captures) {|column, value| column.value = value} - end - end - - # column names by String converted to Symbol - def column_name(*names) - sym_names = names.map {|name| name.is_a?(String) ? name.to_sym : name } - create_or_replace_columns_with(sym_names) {|column, name| column.name = name} - end - def create_or_replace_columns_with(array, &block) columns = array.enum_for(:each_with_index).map do |p, i| c = @columns[i] ? @columns[i] : Column.new(i) yield c, p c end @columns = ColumnArray.new(columns) end - # emitters - def count_uniq(column) - @controller.emit([@current_topic.label, KEY_SEP, column.value].join => 1) - end - - def sum(column) - @controller.emit([@current_topic.label].join => column.value.to_i) - end - class ColumnArray < Array def [](key) case key when Integer then at(key) when Symbol then (select {|c| c.name == key}).first @@ -118,39 +157,40 @@ @index, @value = index, value end end class Topic + attr_reader :key_elements + def initialize(desc, label = nil) @desc, @label = desc, label + @key_elements = [] end def label @label || @desc.gsub(/\s/, '_') end + + def key + without_label = + @key_elements.size > 0 ? @key_elements.join(KEY_SEP) : nil + [label, without_label].compact.join(KEY_SEP) + end end end - class LogAnalysisReducerModel < BaseReducerModel + class LogAnalysisReducerModel < HadoopDsl::BaseReducerModel + attr_reader :topic, :current_topic + def initialize(key, values) super(key, values) if key =~ /(\w*)#{KEY_SEP}?(.*)/ @topic = Topic.new($1, values) end end - def topic(desc, options = {}, &block) + def create_topic(desc, options) @current_topic = Topic.new(options[:label] || desc.gsub(/\s/, '_'), nil) - yield if block_given? - @current_topic - end - - def count_uniq(column) - aggregate if @topic == @current_topic - end - - def sum(column) - aggregate if @topic == @current_topic end class Topic attr_reader :label, :values