require 'cascading/expr_stub'

module Cascading
  JAVA_TYPE_MAP = {
    :int => java.lang.Integer.java_class, :long => java.lang.Long.java_class,
    :bool => java.lang.Boolean.java_class, :double => java.lang.Double.java_class,
    :float => java.lang.Float.java_class, :string => java.lang.String.java_class,
  }

  # FIXME: I consider $jobconf_properties to be a hack forced on us by the lack
  # of properties handling in earlier versions of the gem.  Fully removing the
  # hack would look like introducing a Job abstraction which instantiates user
  # code, and allowing jading's runner to pass properties into that.  I've
  # already taken the step to thread properties through cascades and flows
  # rather than merge properties before connect, but we still require the
  # global properties hack to integrate with external runner code (jading).
  #
  # Note that this would also mean we can get rid of the global "registries" of
  # cascades and flows.  I've already eliminated most uses of these registries,
  # but they are still required for the runner to find user code required in a
  # previous step.  A Job abstraction would clean this up, as well.
  #
  # For now, it is important that people use these constructors rather than
  # directly building their own cascades and flows so that jading can send them
  # default properties.

  # Builds a top-level cascade given a name and a block.  Optionally accepts a
  # :mode, as explained in Cascading::Cascade#initialize.
  def cascade(name, params = {}, &block)
    raise "Could not build cascade '#{name}'; block required" unless block_given?
    raise 'Cascading::cascade does not accept the :properties param only the global $jobconf_properties' if params[:properties]

    params[:properties] = $jobconf_properties.dup if $jobconf_properties

    cascade = Cascade.new(name, params)
    cascade.instance_eval(&block)
    cascade
  end

  # Builds a top-level flow given a name and block for applications built of
  # flows with no cascades.  Optionally accepts a :mode, as explained in
  # Cascading::Flow#initialize.
  def flow(name, params = {}, &block)
    raise "Could not build flow '#{name}'; block required" unless block_given?
    raise 'Cascading::flow does not accept the :properties param only the global $jobconf_properties' if params[:properties]

    params[:properties] = $jobconf_properties.dup if $jobconf_properties

    flow = Flow.new(name, nil, params)
    flow.instance_eval(&block)
    flow
  end

  def describe
    Cascade.all.map{ |cascade| cascade.describe }.join("\n")
  end
  alias desc describe

  # See ExprStub.expr
  def expr(expression, params = {})
    ExprStub.expr(expression, params)
  end

  # Creates a cascading.tuple.Fields instance from a string or an array of strings.
  def fields(fields)
    if fields.nil?
      return nil
    elsif fields.is_a? Java::CascadingTuple::Fields
      return fields
    elsif fields.is_a? ::Array
      if fields.size == 1
        return fields(fields[0])
      end
      raise "Fields cannot be nil: #{fields.inspect}" if fields.include?(nil)
    end
    return Java::CascadingTuple::Fields.new([fields].flatten.map{ |f| f.kind_of?(Fixnum) ? java.lang.Integer.new(f) : f }.to_java(java.lang.Comparable))
  end

  def all_fields
    Java::CascadingTuple::Fields::ALL
  end

  def union_fields(*fields)
    fields(fields.inject([]){ |acc, arr| acc | arr.to_a })
  end

  def difference_fields(*fields)
    fields(fields[1..-1].inject(fields.first.to_a){ |acc, arr| acc - arr.to_a })
  end

  def copy_fields(fields)
    fields.select(all_fields)
  end

  def dedup_fields(*fields)
    raise 'Can only be applied to declarators' unless fields.all?{ |f| f.is_declarator? }
    fields(dedup_field_names(*fields.map{ |f| f.to_a }))
  end

  def dedup_field_names(*names)
    names.inject([]) do |acc, arr|
      acc + arr.map{ |e| search_field_name(acc, e) }
    end
  end

  def search_field_name(names, candidate)
    names.include?(candidate) ? search_field_name(names, "#{candidate}_") : candidate
  end

  def last_grouping_fields
    Java::CascadingTuple::Fields::VALUES
  end

  def results_fields
    Java::CascadingTuple::Fields::RESULTS
  end

  # Creates a TextLine scheme (can be used in both Cascading local and hadoop
  # modes).  Positional args are used if <tt>:source_fields</tt> is not
  # provided.
  #
  # The named options are:
  # * <tt>:source_fields</tt> a string or array of strings.  Specifies the
  #   fields to be read from a source with this scheme.  Defaults to ['offset', 'line'].
  # * <tt>:sink_fields</tt> a string or array of strings. Specifies the fields
  #   to be written to a sink with this scheme.  Defaults to all_fields.
  # * <tt>:compression</tt> a symbol, either <tt>:enable</tt> or
  #   <tt>:disable</tt>, that governs the TextLine scheme's compression.  Defaults
  #   to the default TextLine compression (only applies to c.s.h.TextLine).
  def text_line_scheme(*args)
    options = args.extract_options!
    source_fields = fields(options[:source_fields] || (args.empty? ? ['offset', 'line'] : args))
    sink_fields = fields(options[:sink_fields]) || all_fields
    sink_compression = case options[:compression]
      when :enable  then Java::CascadingSchemeHadoop::TextLine::Compress::ENABLE
      when :disable then Java::CascadingSchemeHadoop::TextLine::Compress::DISABLE
      else Java::CascadingSchemeHadoop::TextLine::Compress::DEFAULT
    end

    {
      :local_scheme => Java::CascadingSchemeLocal::TextLine.new(source_fields, sink_fields),
      :hadoop_scheme => Java::CascadingSchemeHadoop::TextLine.new(source_fields, sink_fields, sink_compression),
    }
  end

  # Creates a c.s.h.SequenceFile scheme instance from the specified fields.  A
  # local SequenceFile scheme is not provided by Cascading, so this scheme
  # cannot be used in Cascading local mode.
  def sequence_file_scheme(*fields)
    {
      :local_scheme => nil,
      :hadoop_scheme => Java::CascadingSchemeHadoop::SequenceFile.new(fields.empty? ? all_fields : fields(fields)),
    }
  end

  def multi_source_tap(*taps)
    MultiTap.multi_source_tap(taps)
  end

  def multi_sink_tap(*taps)
    MultiTap.multi_sink_tap(taps)
  end

  # Creates a Cascading::Tap given a path and optional :scheme and :sink_mode.
  def tap(path, params = {})
    Tap.new(path, params)
  end

  # Constructs properties to be passed to Flow#complete or Cascade#complete
  # which will locate temporary Hadoop files in base_dir.  It is necessary to
  # pass these properties only when executing scripts in Hadoop local mode via
  # JRuby's main method, which confuses Cascading's attempt to find the
  # containing jar.  When using Cascading local mode, these are unnecessary.
  def local_properties(base_dir)
    dirs = {
      'test.build.data' => "#{base_dir}/build",
      'hadoop.tmp.dir' => "#{base_dir}/tmp",
      'hadoop.log.dir' => "#{base_dir}/log",
    }
    dirs.each{ |key, dir| `mkdir -p #{dir}` }

    job_conf = Java::OrgApacheHadoopMapred::JobConf.new
    job_conf.jar = dirs['test.build.data']
    dirs.each{ |key, dir| job_conf.set(key, dir) }

    job_conf.num_map_tasks = 1
    job_conf.num_reduce_tasks = 1

    properties = java.util.HashMap.new
    Java::CascadingFlowHadoopPlanner::HadoopPlanner.copy_job_conf(properties, job_conf)
    properties
  end
end