Sha256: 3fb044c452fc8a507b77ff7f8a939c63c9620e379190dd4a4c231667afb554bd

Contents?: true

Size: 1.71 KB

Versions: 1

Compression:

Stored size: 1.71 KB

Contents

module Mandy
  class Job
    class << self
      def jobs
        @jobs ||= []
      end
      
      def find_by_name(name)
        jobs.find {|job| job.name == name }
      end
    end
    
    attr_reader :settings
    attr_reader :name
    
    def initialize(name, &blk)
      @name = name
      @settings = {}
      @modules = []
      @mapper_class = Mandy::Mappers::PassThroughMapper
      @reducer_class = Mandy::Reducers::PassThroughReducer
      set('mapred.job.name', name)
      instance_eval(&blk) if blk
    end
    
    def mixin(*modules)
      modules.each {|m| @modules << m}
    end
    
    def set(key, value)
      @settings[key.to_s] = value.to_s
    end
    
    def map_tasks(count)
      set('mapred.map.tasks', count)
    end
    
    def reduce_tasks(count)
      set('mapred.reduce.tasks', count)
    end
    
    def store(type, name, options={})
      Mandy.stores[name] = case type
      when :hbase
        Stores::HBase.new(options)
      else
        raise "Unknown store type #{type}"
      end
    end
    
    def map(klass=nil, &blk)
      @mapper_class = klass || Mandy::Mappers::Base.compile(&blk)
      @modules.each {|m| @mapper_class.send(:include, m) }
      @mapper_class
    end
    
    def reduce(klass=nil, &blk)
      @reducer_class = klass || Mandy::Reducers::Base.compile(&blk)
      @modules.each {|m| @reducer_class.send(:include, m) }
      @reducer_class
    end
    
    def run_map(input=STDIN, output=STDOUT, &blk)
      mapper = @mapper_class.new(input, output)
      yield(mapper) if blk
      mapper.execute
    end
    
    def run_reduce(input=STDIN, output=STDOUT, &blk)
      reducer = @reducer_class.new(input, output)
      yield(reducer) if blk
      reducer.execute
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
trafficbroker-mandy-0.2.12 lib/job.rb