require 'fluent/test' class CopyOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ type test name c0 type test name c1 type test name c2 ] def create_driver(conf = CONFIG) Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput).configure(conf) end def test_configure d = create_driver outputs = d.instance.outputs assert_equal 3, outputs.size assert_equal Fluent::TestOutput, outputs[0].class assert_equal Fluent::TestOutput, outputs[1].class assert_equal Fluent::TestOutput, outputs[2].class assert_equal "c0", outputs[0].name assert_equal "c1", outputs[1].name assert_equal "c2", outputs[2].name end def test_emit d = create_driver time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.emit({"a"=>1}, time) d.emit({"a"=>2}, time) d.instance.outputs.each {|o| assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}], ], o.events } end def test_msgpack_es_emit_bug d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput) outputs = %w(p1 p2).map do |pname| p = Fluent::Plugin.new_output('test') p.configure('name' => pname) p.define_singleton_method(:emit) do |tag, es, chain| es.each do |time, record| super(tag, [[time, record]], chain) end end p end d.instance.instance_eval { @outputs = outputs } es = if defined?(MessagePack::Packer) time = Time.parse("2013-05-26 06:37:22 UTC").to_i packer = MessagePack::Packer.new packer.pack([time, {"a" => 1}]) packer.pack([time, {"a" => 2}]) Fluent::MessagePackEventStream.new(packer.to_s) else events = "#{[time, {"a" => 1}].to_msgpack}#{[time, {"a" => 2}].to_msgpack}" Fluent::MessagePackEventStream.new(events) end d.instance.emit('test', es, Fluent::NullOutputChain.instance) d.instance.outputs.each { |o| assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}], ], o.events } end end