require 'logstash-input-kinesis_jars'
require "logstash/plugin"
require "logstash/inputs/kinesis"
require "logstash/codecs/json"
require "json"

RSpec.describe "LogStash::Inputs::Kinesis::Worker" do
  KCL_TYPES = com.amazonaws.services.kinesis.clientlibrary.types

  subject!(:worker) { LogStash::Inputs::Kinesis::Worker.new(codec, queue, decorator, checkpoint_interval) }
  let(:codec) { LogStash::Codecs::JSON.new() }
  let(:queue) { Queue.new }
  let(:decorator) { proc { |x| x["decorated"] = true; x } }
  let(:checkpoint_interval) { 120 }
  let(:checkpointer) { double('checkpointer', checkpoint: nil) }
  let(:init_input) { KCL_TYPES::InitializationInput.new().withShardId("xyz") }

  it "honors the initialize java interface method contract" do
    expect { worker.initialize(init_input) }.to_not raise_error
  end

  def record(hash = { "message" => "test" })
    encoder = java.nio.charset::Charset.forName("UTF-8").newEncoder()
    data = encoder.encode(java.nio.CharBuffer.wrap(JSON.generate(hash)))
    double(getData: data)
  end

  let(:process_input) {
    KCL_TYPES::ProcessRecordsInput.new()
        .withRecords(java.util.Arrays.asList([
            record(id: "record1", message: "test1"),
            record(id: "record2", message: "test2")
          ].to_java)
        )
        .withCheckpointer(checkpointer)
  }
  let(:empty_process_input) {
    KCL_TYPES::ProcessRecordsInput.new()
        .withRecords(java.util.Arrays.asList([].to_java))
        .withCheckpointer(checkpointer)
  }

  context "initialized" do
    before do
      worker.initialize(init_input)
    end

    describe "#processRecords" do
      it "decodes and queues each record with decoration" do
        worker.processRecords(process_input)
        m1 = queue.pop
        m2 = queue.pop
        expect(m1).to be_kind_of(LogStash::Event)
        expect(m2).to be_kind_of(LogStash::Event)
        expect(m1['id']).to eq("record1")
        expect(m1['message']).to eq("test1")
        expect(m1['decorated']).to eq(true)
      end

      it "checkpoints on interval" do
        expect(checkpointer).to receive(:checkpoint).once
        worker.processRecords(empty_process_input)

        # not this time
        worker.processRecords(empty_process_input)

        allow(Time).to receive(:now).and_return(Time.now + 125)
        expect(checkpointer).to receive(:checkpoint).once
        worker.processRecords(empty_process_input)
      end
    end

    describe "#shutdown" do
      it "checkpoints on termination" do
        input = KCL_TYPES::ShutdownInput.new
        checkpointer = double('checkpointer')
        expect(checkpointer).to receive(:checkpoint)
        input.
          with_shutdown_reason(KCL_TYPES::ShutdownReason::TERMINATE).
          with_checkpointer(checkpointer)
        worker.shutdown(input)
      end
    end
  end
end