Sha256: 2977cb6d65d18eb61650c00946ff1b73ac6be11aa7c3ea26956ca3e3b3afab91

Contents?: true

Size: 1.91 KB

Versions: 6

Compression:

Stored size: 1.91 KB

Contents

require "logstash/plugin"
require "logstash/inputs/kinesis"
require "logstash/codecs/json"
require "json"

RSpec.describe "LogStash::Inputs::Kinesis::Worker" do
  subject!(:worker) { LogStash::Inputs::Kinesis::Worker.new(codec, queue, decorator, checkpoint_interval) }
  let(:codec) { LogStash::Codecs::JSON.new() }
  let(:queue) { Queue.new }
  let(:decorator) { proc { |x| x["decorated"] = true; x } }
  let(:checkpoint_interval) { 120 }
  let(:checkpointer) { double('checkpointer', checkpoint: nil) }
  let(:shard_id) { "xyz" }

  it "honors the initialize java interface method contract" do
    expect { worker.initialize(shard_id) }.to_not raise_error
  end

  def record(hash = { "message" => "test" })
    encoder = java.nio.charset::Charset.forName("UTF-8").newEncoder()
    data = encoder.encode(java.nio.CharBuffer.wrap(JSON.generate(hash)))
    double(getData: data)
  end

  let(:record1) { record(id: "record1", message: "test1") }
  let(:record2) { record(id: "record2", message: "test2") }

  context "initialized" do
    before do
      worker.initialize(shard_id)
    end

    describe "#processRecords" do
      it "decodes and queues each record with decoration" do
        worker.processRecords([record1, record2], checkpointer)
        m1 = queue.pop
        m2 = queue.pop
        expect(m1).to be_kind_of(LogStash::Event)
        expect(m2).to be_kind_of(LogStash::Event)
        expect(m1['id']).to eq("record1")
        expect(m1['message']).to eq("test1")
        expect(m1['decorated']).to eq(true)
      end

      it "checkpoints on interval" do
        expect(checkpointer).to receive(:checkpoint).once
        worker.processRecords([], checkpointer)

        # not this time
        worker.processRecords([], checkpointer)

        allow(Time).to receive(:now).and_return(Time.now + 125)
        expect(checkpointer).to receive(:checkpoint).once
        worker.processRecords([], checkpointer)
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 2 rubygems

Version Path
logstash-input-kinesis-cloudwatch-log-subscription-1.3.6-java spec/inputs/kinesis/worker_spec.rb
logstash-input-kinesis-cloudwatch-log-subscription-1.3.5-java spec/inputs/kinesis/worker_spec.rb
logstash-input-kinesis-cloudwatch-log-subscription-1.3.4-java spec/inputs/kinesis/worker_spec.rb
logstash-input-kinesis-cloudwatch-log-subscription-1.3.3-java spec/inputs/kinesis/worker_spec.rb
logstash-input-kinesis-cloudwatch-log-subscription-1.3.2-java spec/inputs/kinesis/worker_spec.rb
logstash-input-kinesis-1.3.1-java spec/inputs/kinesis/worker_spec.rb