# encoding: utf-8 require_relative "../spec_helper" require "logstash/plugin" require "logstash/event" require "msgpack" describe LogStash::Codecs::Fluent do before do @factory = MessagePack::Factory.new @factory.register_type(LogStash::Codecs::Fluent::EventTime::TYPE, LogStash::Codecs::Fluent::EventTime) @packer = @factory.packer @unpacker = @factory.unpacker end let(:properties) { {:name => "foo" } } let(:event) { LogStash::Event.new(properties) } it "should register without errors" do plugin = LogStash::Plugin.lookup("codec", "fluent").new expect { plugin.register }.to_not raise_error end describe "event encoding" do it "should encode as message pack format" do subject.on_event do |event, data| @unpacker.feed_each(data) do |fields| expect(fields[0]).to eq("log") expect(fields[2]["name"]).to eq("foo") end end subject.encode(event) end end describe "event encoding with EventTime" do subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) } it "should encode as message pack format" do subject.on_event do |event, data| @unpacker.feed_each(data) do |fields| expect(fields[0]).to eq("log") expect(fields[2]["name"]).to eq("foo") end end subject.encode(event) end end describe "event decoding" do let(:tag) { "mytag" } let(:epochtime) { event.timestamp.to_i } let(:data) { LogStash::Util.normalize(event.to_hash) } let(:message) do @packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]) end it "should decode without errors" do subject.decode(message) do |event| expect(event.get("name")).to eq("foo") end end end describe "event decoding with EventTime" do let(:tag) { "mytag" } let(:epochtime) { LogStash::Codecs::Fluent::EventTime.new(event.timestamp.to_i, event.timestamp.usec * 1000) } let(:data) { LogStash::Util.normalize(event.to_hash) } let(:message) do @packer.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]) end subject { LogStash::Plugin.lookup("codec", "fluent").new({"nanosecond_precision" => true}) } it "should decode without errors" do subject.decode(message) do |event| expect(event.get("name")).to eq("foo") end end end describe "forward protocol tag" do let(:event) { LogStash::Event.new(properties) } subject { LogStash::Plugin.lookup("codec", "fluent").new } describe "when passing Array value" do let(:properties) { {:tags => ["test", "logstash"], :name => "foo" } } it "should be joined with '.'" do subject.forwardable_tag(event) do |tag| expect(tag).to eq("test.logstash") end end end describe "when passing String value" do let(:properties) { {:tags => "test.logstash", :name => "foo" } } it "should be pass-through" do subject.forwardable_tag(event) do |tag| expect(tag).to eq("test.logstash") end end end describe "when passing other value" do let(:properties) { {:tags => :symbol, :name => "foo" } } it "should be called to_s" do subject.forwardable_tag(event) do |tag| expect(tag).to eq("symbol") end end end end describe "event decoding (buckets of events)" do let(:tag) { "mytag" } let(:epochtime) { event.timestamp.to_i } let(:data) { LogStash::Util.normalize(event.to_hash) } let(:message) do @packer.pack([tag, [ [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)] ] ]) end it "should decode without errors" do count = 0 subject.decode(message) do |event| expect(event.get("name")).to eq("foo") count += 1 end expect(count).to eq(3) end end describe "event decoding (broken package)" do let(:tag) { "mytag" } let(:epochtime) { event.timestamp.to_s } let(:data) { LogStash::Util.normalize(event.to_hash) } let(:message) do MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601) ]) end it "should decode with errors" do subject.decode(message) do |event| expect(event.get("name")).not_to eq("foo") end end it "should inject a failure event" do subject.decode(message) do |event| expect(event.get("tags")).to include("_fluentparsefailure") end end end end