lib/job.rb in mandy-0.3.6 vs lib/job.rb in mandy-0.3.7
- old
+ new
@@ -15,12 +15,10 @@
def initialize(name, &blk)
@name = name
@settings = {}
@modules = []
- @mapper_class = Mandy::Mappers::PassThroughMapper
- @reducer_class = Mandy::Reducers::PassThroughReducer
set('mapred.job.name', name)
instance_eval(&blk) if blk
end
def mixin(*modules)
@@ -55,38 +53,70 @@
else
raise "Unknown store type #{type}"
end
end
+ def setup(&blk)
+ @setup = blk
+ end
+
+ def teardown(&blk)
+ @teardown = blk
+ end
+
def map(klass=nil, &blk)
- @mapper_class = klass || Mandy::Mappers::Base.compile(&blk)
- @modules.each {|m| @mapper_class.send(:include, m) }
- @mapper_class
+ @map = klass || blk
end
def reduce(klass=nil, &blk)
- @reducer_class = klass || Mandy::Reducers::Base.compile(&blk)
- @modules.each {|m| @reducer_class.send(:include, m) }
- @reducer_class
+ @reduce = klass || blk
end
def run_map(input=STDIN, output=STDOUT, &blk)
- @mapper_class.send(:include, Mandy::IO::OutputFormatting) unless reducer_defined?
- mapper = @mapper_class.new(input, output, @input_format, @output_format)
+ mapper_class.send(:include, Mandy::IO::OutputFormatting) unless reducer_defined?
+ mapper = mapper_class.new(input, output, @input_format, @output_format)
yield(mapper) if blk
mapper.execute
end
def run_reduce(input=STDIN, output=STDOUT, &blk)
- reducer = @reducer_class.new(input, output, @input_format, @output_format)
+ reducer = reducer_class.new(input, output, @input_format, @output_format)
yield(reducer) if blk
reducer.execute
end
private
+
+ def mapper_class
+ return Mandy::Mappers::PassThroughMapper unless @map
+ @mapper_class ||= compile_map
+ end
+ def reducer_class
+ return Mandy::Reducers::PassThroughReducer unless @reduce
+ @reducer_class ||= compile_reduce
+ end
+
+ def compile_map
+ args = {}
+ args[:setup] = @setup if @setup
+ args[:teardown] = @teardown if @teardown
+ @mapper_class = @map.is_a?(Proc) ? Mandy::Mappers::Base.compile(args, &@map) : @map
+ @modules.each {|m| @mapper_class.send(:include, m) }
+ @mapper_class
+ end
+
+ def compile_reduce
+ args = {}
+ args[:setup] = @setup if @setup
+ args[:teardown] = @teardown if @teardown
+ @reducer_class = @reduce.is_a?(Proc) ? Mandy::Reducers::Base.compile(args, &@reduce) : @reduce
+ @modules.each {|m| @reducer_class.send(:include, m) }
+ @reducer_class
+ end
+
def reducer_defined?
- @reducer_class != Mandy::Reducers::PassThroughReducer
+ reducer_class != Mandy::Reducers::PassThroughReducer
end
end
end
\ No newline at end of file