module MapreduceHelper
# You can include the MapreduceHelper into your class to give you standard self.map and self.reduce methods.
# You need only implement self.map_each and self.reduce_each methods which accept a single item (istead of an arrad)
#
# Example Usage:
# This example is a bit contrived.
# 
#   class MapReduceTest
#     include MapreduceHelper
#   
#     def self.run
#       job = Skynet::Job.new(
#         :mappers          => 2, 
#         :reducers         => 1,
#         :map_reduce_class => self,
#         :map_data         => ['http://www.geni.com'.'http://www.yahoo.com','http://www.cnet.com']
#       )    
#       results = job.run
#     end
#   
#     def self.map_each(url)
#       SomeUrlSlurper.gather_results(url)  # returns an array of urls of sites that link to the given url
#     end
#   
#     def self.reduce_each(linked_from_url)
#       SomeUrlSluper.find_text("mysite", linked_from_url)   # finds all the times "mysite" appears in the given url, which we know links to the url given in the map_data
#     end
#   end
# 
#   MapReduceTest.run

  
  def self.included(base)
    base.extend MapreduceHelper
  end               

  # Takes an array of map_data, iterates over that array calling self.map_each(item) for each 
  # item in that array.   Catches exceptions in each iteration and continues processing.
  def map(map_data_array)
    raise Skynet::Job::BadMapOrReduceError.new("#{self.class} has no self.map_each method.") unless self.respond_to?(:map_each)
    if map_data_array.is_a?(Array)
      results = []
      map_data_array.each do |data|
        begin
          results << map_each(data)
        rescue Exception => e
          error "ERROR IN #{self} [#{e.class} #{e.message}] #{e.backtrace.join("\n")}"
        end                
      end
      results
    else
      map_each(map_data_array)
    end
  end
  
  # Takes an array of post reduce_partitioned data, iterates over that array calling self.reduce_each(item) for each 
  # item in that array.   Catches exceptions in each iteration and continues processing.
  def reduce(reduce_partitioned_data_array)
    return reduce_partitioned_data_array unless self.respond_to?(:reduce_each)
    if reduce_partitioned_data_array.is_a?(Array)
      results = []
      reduce_partitioned_data_array.each do |data|
        begin
          results << reduce_each(data)
        rescue Exception => e
          error "ERROR IN #{self} [#{e.class} #{e.message}] #{e.backtrace.join("\n")}"
        end        
      end
      results
    else
      reduce_each(reduce_partitioned_data_array)
    end
  end  
end