require_relative '../helper'
require 'time'
require 'fileutils'
require 'fluent/event'
require 'fluent/unique_id'
require 'fluent/plugin/buffer'
require 'fluent/plugin/out_secondary_file'
require 'fluent/plugin/buffer/memory_chunk'
require 'fluent/test/driver/output'

class FileOutputSecondaryTest < Test::Unit::TestCase
  include Fluent::UniqueId::Mixin

  def setup
    Fluent::Test.setup
    FileUtils.rm_rf(TMP_DIR)
    FileUtils.mkdir_p(TMP_DIR)
  end

  TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_secondary_file#{ENV['TEST_ENV_NUMBER']}")

  CONFIG = %[
    directory #{TMP_DIR}
    basename out_file_test
    compress gzip
  ]

  class DummyOutput < Fluent::Plugin::Output
    def write(chunk); end
  end

  def create_primary(buffer_cofig = config_element('buffer'))
    DummyOutput.new.configure(config_element('ROOT','',{}, [buffer_cofig]))
  end

  def create_driver(conf = CONFIG, primary = create_primary)
    c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
    c.instance.acts_as_secondary(primary)
    c.configure(conf)
  end

  sub_test_case 'configture' do
    test 'default configuration' do
      d = create_driver %[directory #{TMP_DIR}]
      assert_equal 'dump.bin', d.instance.basename
      assert_equal TMP_DIR, d.instance.directory
      assert_equal :text, d.instance.compress
      assert_equal false, d.instance.append
    end

    test 'should be configurable' do
      d = create_driver %[
         directory #{TMP_DIR}
         basename out_file_test
         compress gzip
         append true
      ]
      assert_equal 'out_file_test', d.instance.basename
      assert_equal TMP_DIR, d.instance.directory
      assert_equal :gzip, d.instance.compress
      assert_equal true, d.instance.append
    end

    test 'should only use in secondary' do
      c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
      assert_raise Fluent::ConfigError.new("This plugin can only be used in the <secondary> section") do
        c.configure(CONFIG)
      end
    end

    test 'basename should not include `/`' do
      assert_raise Fluent::ConfigError.new("basename should not include `/`") do
        create_driver %[
          directory #{TMP_DIR}
          basename out/file
        ]
      end
    end

    test 'directory should be writable' do
      assert_nothing_raised do
        create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
      end

      assert_nothing_raised do
        FileUtils.mkdir_p("#{TMP_DIR}/test_dir")
        File.chmod(0777, "#{TMP_DIR}/test_dir")
        create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
      end

      assert_raise Fluent::ConfigError.new("out_secondary_file: `#{TMP_DIR}/test_dir/foo/bar/` should be writable") do
        FileUtils.mkdir_p("#{TMP_DIR}/test_dir")
        File.chmod(0555, "#{TMP_DIR}/test_dir")
        create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
      end
    end

    test 'should be passed directory' do
      assert_raise Fluent::ConfigError do
        create_driver %[]
      end

      assert_nothing_raised do
        create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
      end
    end
  end

  def check_gzipped_result(path, expect)
    # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790
    # Following code from https://www.ruby-forum.com/topic/971591#979520
    result = ""
    waiting(10) do
      # we can expect that GzipReader#read can wait unflushed raw data of `io` on disk
      File.open(path, "rb") { |io|
        loop do
          gzr = Zlib::GzipReader.new(io)
          result << gzr.read
          unused = gzr.unused
          gzr.finish
          break if unused.nil?
          io.pos -= unused.length
        end
      }
    end

    assert_equal expect, result
  end

  def create_chunk(primary, metadata, es)
    primary.buffer.generate_chunk(metadata).tap do |c|
      c.concat(es.to_msgpack_stream, es.size) # to_msgpack_stream is standard_format
      c.commit
    end
  end

  sub_test_case 'write' do
    setup do
      @record = { 'key' => 'value' }
      @time = event_time
      @es = Fluent::OneEventStream.new(@time, @record)
      @primary = create_primary
      metadata = @primary.buffer.new_metadata
      @chunk = create_chunk(@primary, metadata, @es)
    end

    test 'should output compressed file when compress option is gzip' do
      d = create_driver(CONFIG, @primary)
      path = d.instance.write(@chunk)

      assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
      check_gzipped_result(path, @es.to_msgpack_stream.force_encoding('ASCII-8BIT'))
    end

    test 'should output plain text when compress option is default(text)' do
      d = create_driver(%[
        directory #{TMP_DIR}/
        basename out_file_test
      ], @primary)

      msgpack_binary = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

      path = d.instance.write(@chunk)
      assert_equal "#{TMP_DIR}/out_file_test.0", path
      waiting(5) do
        sleep 0.1 until File.stat(path).size == msgpack_binary.size
      end

      assert_equal File.read(path), msgpack_binary
    end

    test 'path should be incremental when append option is false' do
      d = create_driver(CONFIG, @primary)
      packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

      5.times do |i|
        path = d.instance.write(@chunk)
        assert_equal "#{TMP_DIR}/out_file_test.#{i}.gz", path
        check_gzipped_result(path, packed_value)
      end
    end

    test 'path should be unchanged when append option is true' do
      d = create_driver(CONFIG + %[append true], @primary)
      packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

      [*1..5].each do |i|
        path = d.instance.write(@chunk)
        assert_equal "#{TMP_DIR}/out_file_test.gz", path
        check_gzipped_result(path, packed_value * i)
      end
    end
  end

  sub_test_case 'Syntax of placeholders' do
    data(
      tag: '${tag}',
      tag_index: '${tag[0]}',
      tag_index1: '${tag[10]}',
      variable: '${key1}',
      variable2: '${key@value}',
      variable3: '${key_value}',
      variable4: '${key.value}',
      variable5: '${key-value}',
      variable6: '${KEYVALUE}',
      variable7: '${tags}',
      variable8: '${tag${key}', # matched ${key}
    )
    test 'matches with a valid placeholder' do |path|
      assert Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path)
    end

    data(
      invalid_tag: 'tag',
      invalid_tag2: '{tag}',
      invalid_tag3: '${tag',
      invalid_tag4: '${tag0]}',
      invalid_tag5: '${tag[]]}',
      invalid_variable: '${key[0]}',
      invalid_variable2: '${key{key2}}',
    )
    test "doesn't match with an invalid placeholder" do |path|
      assert !Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path)
    end
  end

  sub_test_case 'path' do
    setup do
      @record = { 'key' => 'value' }
      @time = event_time
      @es = Fluent::OneEventStream.new(@time, @record)
      primary = create_primary
      m = primary.buffer.new_metadata
      @c = create_chunk(primary, m, @es)
    end

    test 'normal path when compress option is gzip' do
      d = create_driver
      path = d.instance.write(@c)
      assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
    end

    test 'normal path when compress option is default' do
      d = create_driver %[
        directory #{TMP_DIR}
        basename out_file_test
      ]
      path = d.instance.write(@c)
      assert_equal "#{TMP_DIR}/out_file_test.0", path
    end

    test 'normal path when append option is true' do
      d = create_driver %[
        directory #{TMP_DIR}
        append true
      ]
      path = d.instance.write(@c)
      assert_equal "#{TMP_DIR}/dump.bin", path
    end

    data(
      invalid_tag: [/tag/, '${tag}'],
      invalid_tag0: [/tag\[0\]/, '${tag[0]}'],
      invalid_variable: [/dummy/, '${dummy}'],
      invalid_timeformat: [/time/, '%Y%m%d'],
    )
    test 'raise an error when basename includes incompatible placeholder' do |(expected_message, invalid_basename)|
      c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
      c.instance.acts_as_secondary(DummyOutput.new)

      assert_raise_message(expected_message) do
        c.configure %[
          directory #{TMP_DIR}/
          basename #{invalid_basename}
          compress gzip
        ]
      end
    end

    data(
      invalid_tag: [/tag/, '${tag}'],
      invalid_tag0: [/tag\[0\]/, '${tag[0]}'],
      invalid_variable: [/dummy/, '${dummy}'],
      invalid_timeformat: [/time/, '%Y%m%d'],
    )
    test 'raise an error when directory includes incompatible placeholder' do |(expected_message, invalid_directory)|
      c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
      c.instance.acts_as_secondary(DummyOutput.new)

      assert_raise_message(expected_message) do
        c.configure %[
          directory #{invalid_directory}/
          compress gzip
        ]
      end
    end

    test 'basename includes tag' do
      primary = create_primary(config_element('buffer', 'tag'))

      d = create_driver(%[
        directory #{TMP_DIR}/
        basename cool_${tag}
        compress gzip
      ], primary)

      m = primary.buffer.new_metadata(tag: 'test.dummy')
      c = create_chunk(primary, m, @es)

      path = d.instance.write(c)
      assert_equal "#{TMP_DIR}/cool_test.dummy.0.gz", path
    end

    test 'basename includes /tag[\d+]/' do
      primary = create_primary(config_element('buffer', 'tag'))

      d = create_driver(%[
        directory #{TMP_DIR}/
        basename cool_${tag[0]}_${tag[1]}
        compress gzip
      ], primary)

      m = primary.buffer.new_metadata(tag: 'test.dummy')
      c = create_chunk(primary, m, @es)

      path = d.instance.write(c)
      assert_equal "#{TMP_DIR}/cool_test_dummy.0.gz", path
    end

    test 'basename includes time format' do
      primary = create_primary(
        config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1 })
      )

      d = create_driver(%[
        directory #{TMP_DIR}/
        basename cool_%Y%m%d%H
        compress gzip
      ], primary)

      m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
      c = create_chunk(primary, m, @es)

      path = d.instance.write(c)
      assert_equal "#{TMP_DIR}/cool_2011010222.0.gz", path
    end

    test 'basename includes time format with timekey_use_utc option' do
      primary = create_primary(
        config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1, 'timekey_use_utc' => true })
      )

      d = create_driver(%[
        directory #{TMP_DIR}/
        basename cool_%Y%m%d%H
        compress gzip
      ], primary)

      m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
      c = create_chunk(primary, m, @es)

      path = d.instance.write(c)
      assert_equal "#{TMP_DIR}/cool_2011010213.0.gz", path
    end

    test 'basename includes variable' do
      primary = create_primary(config_element('buffer', 'test1'))

      d = create_driver(%[
        directory #{TMP_DIR}/
        basename cool_${test1}
        compress gzip
      ], primary)

      m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" })
      c = create_chunk(primary, m, @es)

      path = d.instance.write(c)
      assert_equal "#{TMP_DIR}/cool_dummy.0.gz", path
    end

    test 'basename includes unnecessary variable' do
      primary = create_primary(config_element('buffer', 'test1'))
      c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
      c.instance.acts_as_secondary(primary)

      assert_raise_message(/test2/) do
        c.configure %[
          directory #{TMP_DIR}/
          basename ${test1}_${test2}
          compress gzip
        ]
      end
    end

    test 'basename includes tag, time format, and variables' do
      primary = create_primary(
        config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 })
      )

      d = create_driver(%[
        directory #{TMP_DIR}/
        basename cool_%Y%m%d%H_${tag}_${test1}
        compress gzip
      ], primary)

      m = primary.buffer.new_metadata(
        timekey: event_time("2011-01-02 13:14:15 UTC"),
        tag: 'test.tag',
        variables: { "test1".to_sym => "dummy" }
      )

      c = create_chunk(primary, m, @es)

      path = d.instance.write(c)
      assert_equal "#{TMP_DIR}/cool_2011010213_test.tag_dummy.0.gz", path
    end

    test 'directory includes tag, time format, and variables' do
      primary = create_primary(
        config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 })
      )

      d = create_driver(%[
        directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1}
        compress gzip
      ], primary)

      m = primary.buffer.new_metadata(
        timekey: event_time("2011-01-02 13:14:15 UTC"),
        tag: 'test.tag',
        variables: { "test1".to_sym => "dummy" }
      )
      c = create_chunk(primary, m, @es)

      path = d.instance.write(c)
      assert_equal "#{TMP_DIR}/2011010213/test.tag/dummy/dump.bin.0.gz", path
    end
  end
end