require "helper"
require "fluent/test/driver/input"
require "fluent/test/helpers"
require "timecop"

class MongoTailInputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
    setup_mongod
  end

  def teardown
    teardown_mongod
  end

  def collection_name
    'test'
  end

  def database_name
    'fluent_test'
  end

  def port
    27017
  end

  def default_config
    %[
      type mongo_tail
      database test
      collection log
      tag_key tag
      time_key time
      id_store_file /tmp/fluent_mongo_last_id
    ]
  end

  def setup_mongod
    options = {}
    options[:database] = database_name
    @client = ::Mongo::Client.new(["localhost:#{port}"], options)
  end

  def teardown_mongod
    @client[collection_name].drop
  end

  def create_driver(conf=default_config)
    Fluent::Test::Driver::Input.new(Fluent::Plugin::MongoTailInput).configure(conf)
  end

  def test_configure
    d = create_driver
    assert_equal('localhost', d.instance.host)
    assert_equal(27017, d.instance.port)
    assert_equal('test', d.instance.database)
    assert_equal('log', d.instance.collection)
    assert_equal('tag', d.instance.tag_key)
    assert_equal('time', d.instance.time_key)
    assert_equal('/tmp/fluent_mongo_last_id', d.instance.id_store_file)
  end

  def test_configure_with_logger_conf
    d = create_driver(default_config + %[
      mongo_log_level error
    ])

    expected = "error"
    assert_equal(expected, d.instance.mongo_log_level)
  end

  class TailInputTest < self
    include Fluent::Test::Helpers

    def setup_mongod
      options = {}
      options[:database] = database_name
      @client = ::Mongo::Client.new(["localhost:#{port}"], options)
      @time = Time.now
      Timecop.freeze(@time)
    end

    def teardown_mongod
      @client[collection_name].drop
      Timecop.return
    end

    def test_emit
      d = create_driver(%[
        @type mongo_tail
        database #{database_name}
        collection #{collection_name}
        tag input.mongo
        time_key time
      ])
      d.run(expect_records: 1, timeout: 5) do
        @client[collection_name].insert_one({message: "test"})
      end
      events = d.events
      assert_equal "input.mongo", events[0][0]
      assert_equal event_time(@time.to_s), events[0][1]
      assert_equal "test", events[0][2]["message"]
    end

    def test_emit_with_tag_time_keys
      d = create_driver(%[
        @type mongo_tail
        database #{database_name}
        collection #{collection_name}
        tag input.mongo
        tag_key tag
        time_key time
      ])
      d.run(expect_records: 1, timeout: 5) do
        @client[collection_name].insert_one({message: "test", tag: "user.defined", time: Fluent::Engine.now})
      end
      events = d.events
      assert_equal "user.defined", events[0][0]
      assert_equal event_time(@time.to_s), events[0][1]
      assert_equal "test", events[0][2]["message"]
    end

    def test_emit_after_last_id
      d = create_driver(%[
        @type mongo_tail
        database #{database_name}
        collection #{collection_name}
        tag input.mongo.last_id
        time_key time
      ])
      @client[collection_name].insert_one({message: "can't obtain"})

      d.run(expect_records: 1, timeout: 5) do
        @client[collection_name].insert_one({message: "can obtain"})
      end
      events = d.events
      assert_equal 1, events.size
      assert_equal "input.mongo.last_id", events[0][0]
      assert_equal event_time(@time.to_s), events[0][1]
      assert_equal "can obtain", events[0][2]["message"]
    end
  end

  class MongoAuthenticateTest < self
    require 'fluent/plugin/mongo_auth'
    include ::Fluent::MongoAuth

    def setup_mongod
      options = {}
      options[:database] = database_name
      @client = ::Mongo::Client.new(["localhost:#{port}"], options)
      @client.database.users.create('fluent', password: 'password',
                                    roles: [Mongo::Auth::Roles::READ_WRITE])
    end

    def teardown_mongod
      @client[collection_name].drop
      @client.database.users.remove('fluent')
    end

    def test_authenticate
      d = create_driver(default_config + %[
        user fluent
        password password
      ])

      assert authenticate(@client)
    end
  end
end