lib/job.rb in mandy-0.3.6 vs lib/job.rb in mandy-0.3.7

- old
+ new

@@ -15,12 +15,10 @@ def initialize(name, &blk) @name = name @settings = {} @modules = [] - @mapper_class = Mandy::Mappers::PassThroughMapper - @reducer_class = Mandy::Reducers::PassThroughReducer set('mapred.job.name', name) instance_eval(&blk) if blk end def mixin(*modules) @@ -55,38 +53,70 @@ else raise "Unknown store type #{type}" end end + def setup(&blk) + @setup = blk + end + + def teardown(&blk) + @teardown = blk + end + def map(klass=nil, &blk) - @mapper_class = klass || Mandy::Mappers::Base.compile(&blk) - @modules.each {|m| @mapper_class.send(:include, m) } - @mapper_class + @map = klass || blk end def reduce(klass=nil, &blk) - @reducer_class = klass || Mandy::Reducers::Base.compile(&blk) - @modules.each {|m| @reducer_class.send(:include, m) } - @reducer_class + @reduce = klass || blk end def run_map(input=STDIN, output=STDOUT, &blk) - @mapper_class.send(:include, Mandy::IO::OutputFormatting) unless reducer_defined? - mapper = @mapper_class.new(input, output, @input_format, @output_format) + mapper_class.send(:include, Mandy::IO::OutputFormatting) unless reducer_defined? + mapper = mapper_class.new(input, output, @input_format, @output_format) yield(mapper) if blk mapper.execute end def run_reduce(input=STDIN, output=STDOUT, &blk) - reducer = @reducer_class.new(input, output, @input_format, @output_format) + reducer = reducer_class.new(input, output, @input_format, @output_format) yield(reducer) if blk reducer.execute end private + + def mapper_class + return Mandy::Mappers::PassThroughMapper unless @map + @mapper_class ||= compile_map + end + def reducer_class + return Mandy::Reducers::PassThroughReducer unless @reduce + @reducer_class ||= compile_reduce + end + + def compile_map + args = {} + args[:setup] = @setup if @setup + args[:teardown] = @teardown if @teardown + @mapper_class = @map.is_a?(Proc) ? Mandy::Mappers::Base.compile(args, &@map) : @map + @modules.each {|m| @mapper_class.send(:include, m) } + @mapper_class + end + + def compile_reduce + args = {} + args[:setup] = @setup if @setup + args[:teardown] = @teardown if @teardown + @reducer_class = @reduce.is_a?(Proc) ? Mandy::Reducers::Base.compile(args, &@reduce) : @reduce + @modules.each {|m| @reducer_class.send(:include, m) } + @reducer_class + end + def reducer_defined? - @reducer_class != Mandy::Reducers::PassThroughReducer + reducer_class != Mandy::Reducers::PassThroughReducer end end end \ No newline at end of file