require "logstash/plugin"
require "logstash/inputs/kinesis"
require "logstash/codecs/json"

RSpec.describe "inputs/kinesis" do
  KCL = com.amazonaws.services.kinesis.clientlibrary.lib.worker

  let(:config) {{
    "application_name" => "my-processor",
    "kinesis_stream_name" => "run-specs",
    "codec" => codec,
    "metrics" => metrics,
    "checkpoint_interval_seconds" => 120,
    "region" => "ap-southeast-1",
    "profile" => nil
  }}

  # Config hash to test credentials provider to be used if profile is specified
  let(:config_with_profile) {{
    "application_name" => "my-processor",
    "kinesis_stream_name" => "run-specs",
    "codec" => codec,
    "metrics" => metrics,
    "checkpoint_interval_seconds" => 120,
    "region" => "ap-southeast-1",
    "profile" => "my-aws-profile"
  }}

  # other config with LATEST as initial_position_in_stream
  let(:config_with_latest) {{
    "application_name" => "my-processor",
    "kinesis_stream_name" => "run-specs",
    "codec" => codec,
    "metrics" => metrics,
    "checkpoint_interval_seconds" => 120,
    "region" => "ap-southeast-1",
    "profile" => nil,
    "initial_position_in_stream" => "LATEST"
  }}

  # Config hash to test valid additional_settings
  let(:config_with_valid_additional_settings) {{
    "application_name" => "my-processor",
    "kinesis_stream_name" => "run-specs",
    "codec" => codec,
    "metrics" => metrics,
    "checkpoint_interval_seconds" => 120,
    "region" => "ap-southeast-1",
    "profile" => nil,
    "additional_settings" => {
        "initial_lease_table_read_capacity" => 25,
        "initial_lease_table_write_capacity" => 100,
        "kinesis_endpoint" => "http://localhost"
    }
  }}

  # Config hash to test invalid additional_settings where the name is not found
  let(:config_with_invalid_additional_settings_name_not_found) {{
    "application_name" => "my-processor",
    "kinesis_stream_name" => "run-specs",
    "codec" => codec,
    "metrics" => metrics,
    "checkpoint_interval_seconds" => 120,
    "region" => "ap-southeast-1",
    "profile" => nil,
    "additional_settings" => {
        "foo" => "bar"
    }
  }}

  # Config hash to test invalid additional_settings where the type is complex or wrong
  let(:config_with_invalid_additional_settings_wrong_type) {{
    "application_name" => "my-processor",
    "kinesis_stream_name" => "run-specs",
    "codec" => codec,
    "metrics" => metrics,
    "checkpoint_interval_seconds" => 120,
    "region" => "ap-southeast-1",
    "profile" => nil,
    "additional_settings" => {
        "metrics_level" => "invalid_metrics_level"
    }
  }}

  subject!(:kinesis) { LogStash::Inputs::Kinesis.new(config) }
  let(:kcl_worker) { double('kcl_worker') }
  let(:stub_builder) { double('kcl_builder', build: kcl_worker) }
  let(:metrics) { nil }
  let(:codec) { LogStash::Codecs::JSON.new() }
  let(:queue) { Queue.new }

  it "registers without error" do
    input = LogStash::Plugin.lookup("input", "kinesis").new("kinesis_stream_name" => "specs", "codec" => codec)
    expect { input.register }.to_not raise_error
  end

  it "configures the KCL" do
    kinesis.register
    expect(kinesis.kcl_config.applicationName).to eq("my-processor")
    expect(kinesis.kcl_config.streamName).to eq("run-specs")
    expect(kinesis.kcl_config.regionName).to eq("ap-southeast-1")
    expect(kinesis.kcl_config.initialPositionInStream).to eq(KCL::InitialPositionInStream::TRIM_HORIZON)
    expect(kinesis.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
  end

  subject!(:kinesis_with_profile) { LogStash::Inputs::Kinesis.new(config_with_profile) }

  it "uses ProfileCredentialsProvider if profile is specified" do
    kinesis_with_profile.register
    expect(kinesis_with_profile.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.profile.ProfileCredentialsProvider")
  end

  subject!(:kinesis_with_latest) { LogStash::Inputs::Kinesis.new(config_with_latest) }

  it "configures the KCL" do
    kinesis_with_latest.register
    expect(kinesis_with_latest.kcl_config.applicationName).to eq("my-processor")
    expect(kinesis_with_latest.kcl_config.streamName).to eq("run-specs")
    expect(kinesis_with_latest.kcl_config.regionName).to eq("ap-southeast-1")
    expect(kinesis_with_latest.kcl_config.initialPositionInStream).to eq(KCL::InitialPositionInStream::LATEST)
    expect(kinesis_with_latest.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
  end

  subject!(:kinesis_with_valid_additional_settings) { LogStash::Inputs::Kinesis.new(config_with_valid_additional_settings) }

  it "configures the KCL" do
    kinesis_with_valid_additional_settings.register
    expect(kinesis_with_valid_additional_settings.kcl_config.applicationName).to eq("my-processor")
    expect(kinesis_with_valid_additional_settings.kcl_config.streamName).to eq("run-specs")
    expect(kinesis_with_valid_additional_settings.kcl_config.regionName).to eq("ap-southeast-1")
    expect(kinesis_with_valid_additional_settings.kcl_config.initialLeaseTableReadCapacity).to eq(25)
    expect(kinesis_with_valid_additional_settings.kcl_config.initialLeaseTableWriteCapacity).to eq(100)
    expect(kinesis_with_valid_additional_settings.kcl_config.kinesisEndpoint).to eq("http://localhost")
  end


  subject!(:kinesis_with_invalid_additional_settings_name_not_found) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_name_not_found) }

  it "raises NoMethodError for invalid configuration options" do
    expect{ kinesis_with_invalid_additional_settings_name_not_found.register }.to raise_error(NoMethodError)
  end


  subject!(:kinesis_with_invalid_additional_settings_wrong_type) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_wrong_type) }

  it "raises an error for invalid configuration values such as the wrong type" do
    expect{ kinesis_with_invalid_additional_settings_wrong_type.register }.to raise_error(Java::JavaLang::IllegalArgumentException)
  end


  context "#run" do
    it "runs the KCL worker" do
      expect(kinesis).to receive(:kcl_builder).with(queue).and_return(stub_builder)
      expect(kcl_worker).to receive(:run).with(no_args)
      builder = kinesis.run(queue)
    end
  end

  context "#stop" do
    it "stops the KCL worker" do
      expect(kinesis).to receive(:kcl_builder).with(queue).and_return(stub_builder)
      expect(kcl_worker).to receive(:run).with(no_args)
      expect(kcl_worker).to receive(:shutdown).with(no_args)
      kinesis.run(queue)
      kinesis.do_stop # do_stop calls stop internally
    end
  end

  context "#worker_factory" do
    it "clones the codec for each worker" do
      worker = kinesis.worker_factory(queue).call()
      expect(worker).to be_kind_of(LogStash::Inputs::Kinesis::Worker)
      expect(worker.codec).to_not eq(kinesis.codec)
      expect(worker.codec).to be_kind_of(codec.class)
    end

    it "generates a valid worker" do
      worker = kinesis.worker_factory(queue).call()

      expect(worker.codec).to be_kind_of(codec.class)
      expect(worker.checkpoint_interval).to eq(120)
      expect(worker.output_queue).to eq(queue)
      expect(worker.decorator).to eq(kinesis.method(:decorate))
      expect(worker.logger).to eq(kinesis.logger)
    end
  end

  # these tests are heavily dependent on the current Worker::Builder
  # implementation because its state is all private
  context "#kcl_builder" do
    let(:builder) { kinesis.kcl_builder(queue) }

    it "sets the worker factory" do
      expect(field(builder, "recordProcessorFactory")).to_not eq(nil)
    end

    it "sets the config" do
      kinesis.register
      config = field(builder, "config")
      expect(config).to eq(kinesis.kcl_config)
    end

    it "disables metric tracking by default" do
      expect(field(builder, "metricsFactory")).to be_kind_of(com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory)
    end

    context "cloudwatch" do
      let(:metrics) { "cloudwatch" }
      it "uses cloudwatch metrics if specified" do
        # since the behaviour is enclosed on private methods it is not testable. So here
        # the expected value can be tested, not the result associated to set this value
        expect(field(builder, "metricsFactory")).to eq(nil)
      end
    end
  end

  def field(obj, name)
    field = obj.java_class.declared_field(name)
    field.accessible = true
    field.value(obj)
  end
end