module DaruLite
  module Core
    class MergeFrame
      class NilSorter
        include Comparable

        def nil?
          true
        end

        def ==(_other)
          false
        end

        def <=>(other)
          other.nil? ? 0 : -1
        end
      end

      # quick-fix for issue #171
      def initialize(left_df, right_df, opts = {})
        init_opts(opts)
        validate_on!(left_df, right_df)
        key_sanitizer = ->(h) { sanitize_merge_keys(h.values_at(*on)) }

        @left = df_to_a(left_df)
        @left.sort! { |a, b| safe_compare(a.values_at(*on), b.values_at(*on)) }
        @left_key_values = @left.map(&key_sanitizer)

        @right = df_to_a(right_df)
        @right.sort! { |a, b| safe_compare(a.values_at(*on), b.values_at(*on)) }
        @right_key_values = @right.map(&key_sanitizer)

        @left_keys, @right_keys = merge_keys(left_df, right_df, on)
      end

      def join
        res = []

        until left.empty? && right.empty?
          lkey = first_left_key
          rkey = first_right_key

          row(lkey, rkey).tap { |r| res << r if r }
        end

        DaruLite::DataFrame.new(res, order: dataframe_vector_names)
      end

      private

      attr_reader :on, :indicator,
                  :left, :left_key_values, :keep_left, :left_keys,
                  :right, :right_key_values, :keep_right, :right_keys

      attr_accessor :merge_key

      LEFT_RIGHT_COMBINATIONS = {
        #       left   right
        inner: [false, false],
        left: [true, false],
        right: [false, true],
        outer: [true, true]
      }.freeze

      def init_opts(opts)
        @on = opts[:on]
        @keep_left, @keep_right = extract_left_right(opts[:how])
        @indicator = opts[:indicator]
      end

      def dataframe_vector_names
        left_keys.values + on + right_keys.values + Array(indicator)
      end

      def extract_left_right(how)
        LEFT_RIGHT_COMBINATIONS[how] or
          raise ArgumentError, "Unrecognized join option: #{how}"
      end

      def sanitize_merge_keys(merge_keys)
        merge_keys.map { |v| v.nil? ? NilSorter.new : v }
      end

      def df_to_a(df)
        # FIXME: much faster than "native" DataFrame#to_a. Should not be
        h = df.to_h
        keys = h.keys
        h.values.map(&:to_a).transpose.map { |r| keys.zip(r).to_h }
      end

      def merge_keys(df1, df2, on)
        duplicates =
          (df1.vectors.to_a + df2.vectors.to_a - on)
          .group_by(&:itself)
          .select { |_, g| g.count == 2 }.map(&:first)

        [
          guard_keys(df1.vectors.to_a - on, duplicates, 1),
          guard_keys(df2.vectors.to_a - on, duplicates, 2)
        ]
      end

      def guard_keys(keys, duplicates, num)
        keys.to_h { |v| [v, guard_duplicate(v, duplicates, num)] }
      end

      def guard_duplicate(val, duplicates, num)
        duplicates.include?(val) ? :"#{val}_#{num}" : val
      end

      def row(lkey, rkey)
        # :nocov:
        # It's just an impossibility handler, can't be covered :)
        raise 'Unexpected condition met during merge' if !lkey && !rkey

        # :nocov:
        if lkey == rkey
          self.merge_key = lkey
          add_indicator(merge_matching_rows, :both)
        elsif !rkey || lt(lkey, rkey)
          add_indicator(left_row_missing_right, :left_only)
        else # !lkey || lt(rkey, lkey)
          add_indicator(right_row_missing_left, :right_only)
        end
      end

      def add_indicator(row, indicator_value)
        return row unless indicator

        row[indicator] = indicator_value
        row
      end

      def merge_matching_rows
        if one_to_one_merge?
          merge_rows(one_to_one_left_row, one_to_one_right_row)
        elsif one_to_many_merge?
          result = merge_rows(left.first, right.first)
          one_to_many_shift
          result
        else
          result = cartesian_product.shift
          end_cartesian_product if cartesian_product.empty?
          result
        end
      end

      def one_to_many_shift
        shift_left = first_right_key != next_right_key
        shift_right = first_left_key != next_left_key
        one_to_one_left_row if shift_left
        one_to_one_right_row if shift_right
      end

      def one_to_one_merge?
        merge_key != next_left_key && merge_key != next_right_key
      end

      def one_to_many_merge?
        !(merge_key == next_left_key && merge_key == next_right_key)
      end

      def one_to_one_left_row
        left_key_values.shift
        left.shift
      end

      def one_to_one_right_row
        right_key_values.shift
        right.shift
      end

      def left_row_missing_right
        val = one_to_one_left_row
        expand_row(val, left_keys) if keep_left
      end

      def right_row_missing_left
        val = one_to_one_right_row
        expand_row(val, right_keys) if keep_right
      end

      def lt(k1, k2)
        (k1 <=> k2) == -1
      end

      def merge_rows(lrow, rrow)
        left_keys
          .to_h { |from, to| [to, lrow[from]] }
          .merge(on.to_h { |col| [col, lrow[col]] })
          .merge(indicator ? { indicator => nil } : {})
          .merge(right_keys.to_h { |from, to| [to, rrow[from]] })
      end

      def expand_row(row, renamings)
        renamings
          .to_h { |from, to| [to, row[from]] }
          .merge(on.to_h { |col| [col, row[col]] })
          .merge(indicator ? { indicator => nil } : {})
      end

      def first_right_key
        right_key_values.empty? ? nil : right_key_values.first
      end

      def next_right_key
        right_key_values[1]
      end

      def first_left_key
        left_key_values.empty? ? nil : left_key_values.first
      end

      def next_left_key
        left_key_values[1]
      end

      def left_rows_at_merge_key
        left.take_while { |arr| sanitize_merge_keys(arr.values_at(*on)) == merge_key }
      end

      def right_rows_at_merge_key
        right.take_while { |arr| sanitize_merge_keys(arr.values_at(*on)) == merge_key }
      end

      def cartesian_product
        @cartesian_product ||= left_rows_at_merge_key.product(right_rows_at_merge_key).map do |left_row, right_row|
          merge_rows(left_row, right_row)
        end
      end

      def end_cartesian_product
        left_size = left_rows_at_merge_key.size
        left_key_values.shift(left_size)
        left.shift(left_size)

        right_size = right_rows_at_merge_key.size
        right_key_values.shift(right_size)
        right.shift(right_size)
        @cartesian_product = nil
      end

      def validate_on!(left_df, right_df)
        @on.each do |on|
          (left_df.has_vector?(on) && right_df.has_vector?(on)) or
            raise ArgumentError, "Both dataframes expected to have #{on.inspect} field"
        end
      end

      def safe_compare(left_array, right_array)
        left_array.zip(right_array).map do |l, r|
          next 0 if l.nil? && r.nil?
          next 1 if r.nil?
          next -1 if l.nil?

          l <=> r
        end.reject(&:zero?).first || 0
      end
    end

    module Merge
      class << self
        def join(df1, df2, opts = {})
          MergeFrame.new(df1, df2, opts).join
        end
      end
    end
  end
end