describe Fluent::UnitTimeFilterOutput do let(:time) { Time.parse('2014-02-08 13:14:15 +0900').to_i } describe 'when 2 records transmitted per second' do it 'should be aggregated to two records' do run_driver do |d| (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832855, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.default\", 1391832855, {\"key0_\"=>\"val0_\"}]]"}], ["filtered.test.default", 1391832856, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.default\", 1391832856, {\"key1_\"=>\"val1_\"}]]"}], ["filtered.test.default", 1391832857, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.default\", 1391832857, {\"key2_\"=>\"val2_\"}]]"}], ["filtered.test.default", 1391832858, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.default\", 1391832858, {\"key3_\"=>\"val3_\"}]]"}], ["filtered.test.default", 1391832859, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.default\", 1391832859, {\"key4_\"=>\"val4_\"}]]"}], ["filtered.test.default", 1391832860, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.default\", 1391832860, {\"key5_\"=>\"val5_\"}]]"}], ["filtered.test.default", 1391832861, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.default\", 1391832861, {\"key6_\"=>\"val6_\"}]]"}], ["filtered.test.default", 1391832862, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.default\", 1391832862, {\"key7_\"=>\"val7_\"}]]"}]] ) end end it 'should be rewritten to the specified tag' do run_driver(:prefix => 'any_prefix') do |d| (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["any_prefix.test.default", 1391832855, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.default\", 1391832855, {\"key0_\"=>\"val0_\"}]]"}], ["any_prefix.test.default", 1391832856, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.default\", 1391832856, {\"key1_\"=>\"val1_\"}]]"}], ["any_prefix.test.default", 1391832857, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.default\", 1391832857, {\"key2_\"=>\"val2_\"}]]"}], ["any_prefix.test.default", 1391832858, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.default\", 1391832858, {\"key3_\"=>\"val3_\"}]]"}], ["any_prefix.test.default", 1391832859, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.default\", 1391832859, {\"key4_\"=>\"val4_\"}]]"}], ["any_prefix.test.default", 1391832860, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.default\", 1391832860, {\"key5_\"=>\"val5_\"}]]"}], ["any_prefix.test.default", 1391832861, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.default\", 1391832861, {\"key6_\"=>\"val6_\"}]]"}], ["any_prefix.test.default", 1391832862, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.default\", 1391832862, {\"key7_\"=>\"val7_\"}]]"}]] ) end end it 'should receive multiple converted records' do filter = <<-EOS proc {|rs| [ {'foo' => 'bar', 100 => 200}, {'bar' => 'baz', 200 => 300}, ] } EOS run_driver(:filter => filter) do |d| (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832855, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832855, {"bar"=>"baz", 200=>300}], ["filtered.test.default", 1391832856, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832856, {"bar"=>"baz", 200=>300}], ["filtered.test.default", 1391832857, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832857, {"bar"=>"baz", 200=>300}], ["filtered.test.default", 1391832858, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832858, {"bar"=>"baz", 200=>300}], ["filtered.test.default", 1391832859, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832859, {"bar"=>"baz", 200=>300}], ["filtered.test.default", 1391832860, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832860, {"bar"=>"baz", 200=>300}], ["filtered.test.default", 1391832861, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832861, {"bar"=>"baz", 200=>300}], ["filtered.test.default", 1391832862, {"foo"=>"bar", 100=>200}], ["filtered.test.default", 1391832862, {"bar"=>"baz", 200=>300}]] ) end end end describe 'when 3 records transmitted per second' do it 'should be aggregated to two records' do run_driver do |d| (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) d.emit({"key#{i}__" => "val#{i}__"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832855, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.default\", 1391832855, {\"key0_\"=>\"val0_\"}], [\"test.default\", 1391832855, {\"key0__\"=>\"val0__\"}]]"}], ["filtered.test.default", 1391832856, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.default\", 1391832856, {\"key1_\"=>\"val1_\"}], [\"test.default\", 1391832856, {\"key1__\"=>\"val1__\"}]]"}], ["filtered.test.default", 1391832857, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.default\", 1391832857, {\"key2_\"=>\"val2_\"}], [\"test.default\", 1391832857, {\"key2__\"=>\"val2__\"}]]"}], ["filtered.test.default", 1391832858, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.default\", 1391832858, {\"key3_\"=>\"val3_\"}], [\"test.default\", 1391832858, {\"key3__\"=>\"val3__\"}]]"}], ["filtered.test.default", 1391832859, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.default\", 1391832859, {\"key4_\"=>\"val4_\"}], [\"test.default\", 1391832859, {\"key4__\"=>\"val4__\"}]]"}], ["filtered.test.default", 1391832860, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.default\", 1391832860, {\"key5_\"=>\"val5_\"}], [\"test.default\", 1391832860, {\"key5__\"=>\"val5__\"}]]"}], ["filtered.test.default", 1391832861, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.default\", 1391832861, {\"key6_\"=>\"val6_\"}], [\"test.default\", 1391832861, {\"key6__\"=>\"val6__\"}]]"}], ["filtered.test.default", 1391832862, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.default\", 1391832862, {\"key7_\"=>\"val7_\"}], [\"test.default\", 1391832862, {\"key7__\"=>\"val7__\"}]]"}]] ) end end end describe 'when unit_sec 10' do it 'should be aggregated by every 10 sec' do run_driver(:unit_sec => 10) do |d| (0...30).step(5) do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) d.emit({"key#{i}__" => "val#{i}__"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832850, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.default\", 1391832855, {\"key0_\"=>\"val0_\"}], [\"test.default\", 1391832855, {\"key0__\"=>\"val0__\"}]]"}], ["filtered.test.default", 1391832860, {"count"=>6, "inspect"=>"[[\"test.default\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.default\", 1391832860, {\"key5_\"=>\"val5_\"}], [\"test.default\", 1391832860, {\"key5__\"=>\"val5__\"}], [\"test.default\", 1391832865, {\"key10\"=>\"val10\"}], [\"test.default\", 1391832865, {\"key10_\"=>\"val10_\"}], [\"test.default\", 1391832865, {\"key10__\"=>\"val10__\"}]]"}]] ) end end it 'should be aggregated by every 10 sec' do run_driver(:unit_sec => 10) do |d| (0...100).step(20) do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) d.emit({"key#{i}__" => "val#{i}__"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832850, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.default\", 1391832855, {\"key0_\"=>\"val0_\"}], [\"test.default\", 1391832855, {\"key0__\"=>\"val0__\"}]]"}], ["filtered.test.default", 1391832870, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832875, {\"key20\"=>\"val20\"}], [\"test.default\", 1391832875, {\"key20_\"=>\"val20_\"}], [\"test.default\", 1391832875, {\"key20__\"=>\"val20__\"}]]"}], ["filtered.test.default", 1391832890, {"count"=>3, "inspect"=>"[[\"test.default\", 1391832895, {\"key40\"=>\"val40\"}], [\"test.default\", 1391832895, {\"key40_\"=>\"val40_\"}], [\"test.default\", 1391832895, {\"key40__\"=>\"val40__\"}]]"}]] ) end end end describe 'when multiple tags' do it 'should be output only the first tag' do run_driver do |d| (0...10).each do |i| d.tag = 'test.1' d.emit({"key#{i}" => "val#{i}"}, time + i) d.tag = 'test.2' d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.1", 1391832855, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.2\", 1391832855, {\"key0_\"=>\"val0_\"}]]"}], ["filtered.test.1", 1391832856, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.2\", 1391832856, {\"key1_\"=>\"val1_\"}]]"}], ["filtered.test.1", 1391832857, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.2\", 1391832857, {\"key2_\"=>\"val2_\"}]]"}], ["filtered.test.1", 1391832858, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.2\", 1391832858, {\"key3_\"=>\"val3_\"}]]"}], ["filtered.test.1", 1391832859, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.2\", 1391832859, {\"key4_\"=>\"val4_\"}]]"}], ["filtered.test.1", 1391832860, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.2\", 1391832860, {\"key5_\"=>\"val5_\"}]]"}], ["filtered.test.1", 1391832861, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.2\", 1391832861, {\"key6_\"=>\"val6_\"}]]"}], ["filtered.test.1", 1391832862, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.2\", 1391832862, {\"key7_\"=>\"val7_\"}]]"}]] ) end end it 'should be output to each tag' do run_driver(:emit_each_tag => true) do |d| (0...10).each do |i| d.tag = 'test.1' d.emit({"key#{i}" => "val#{i}"}, time + i) d.tag = 'test.2' d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.1", 1391832855, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.2\", 1391832855, {\"key0_\"=>\"val0_\"}]]"}], ["filtered.test.2", 1391832855, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.2\", 1391832855, {\"key0_\"=>\"val0_\"}]]"}], ["filtered.test.1", 1391832856, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.2\", 1391832856, {\"key1_\"=>\"val1_\"}]]"}], ["filtered.test.2", 1391832856, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.2\", 1391832856, {\"key1_\"=>\"val1_\"}]]"}], ["filtered.test.1", 1391832857, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.2\", 1391832857, {\"key2_\"=>\"val2_\"}]]"}], ["filtered.test.2", 1391832857, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.2\", 1391832857, {\"key2_\"=>\"val2_\"}]]"}], ["filtered.test.1", 1391832858, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.2\", 1391832858, {\"key3_\"=>\"val3_\"}]]"}], ["filtered.test.2", 1391832858, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.2\", 1391832858, {\"key3_\"=>\"val3_\"}]]"}], ["filtered.test.1", 1391832859, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.2\", 1391832859, {\"key4_\"=>\"val4_\"}]]"}], ["filtered.test.2", 1391832859, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.2\", 1391832859, {\"key4_\"=>\"val4_\"}]]"}], ["filtered.test.1", 1391832860, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.2\", 1391832860, {\"key5_\"=>\"val5_\"}]]"}], ["filtered.test.2", 1391832860, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.2\", 1391832860, {\"key5_\"=>\"val5_\"}]]"}], ["filtered.test.1", 1391832861, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.2\", 1391832861, {\"key6_\"=>\"val6_\"}]]"}], ["filtered.test.2", 1391832861, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.2\", 1391832861, {\"key6_\"=>\"val6_\"}]]"}], ["filtered.test.1", 1391832862, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.2\", 1391832862, {\"key7_\"=>\"val7_\"}]]"}], ["filtered.test.2", 1391832862, {"count"=>2, "inspect"=>"[[\"test.1\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.2\", 1391832862, {\"key7_\"=>\"val7_\"}]]"}]] ) end end end describe 'when the array of Hash is passed' do it 'should receive an array of Hash' do run_driver(:pass_hash_row => true) do |d| (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832855, {"count"=>2, "inspect"=>"[{\"key0\"=>\"val0\", \"time\"=>1391832855, \"tag\"=>\"test.default\"}, {\"key0_\"=>\"val0_\", \"time\"=>1391832855, \"tag\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832856, {"count"=>2, "inspect"=>"[{\"key1\"=>\"val1\", \"time\"=>1391832856, \"tag\"=>\"test.default\"}, {\"key1_\"=>\"val1_\", \"time\"=>1391832856, \"tag\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832857, {"count"=>2, "inspect"=>"[{\"key2\"=>\"val2\", \"time\"=>1391832857, \"tag\"=>\"test.default\"}, {\"key2_\"=>\"val2_\", \"time\"=>1391832857, \"tag\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832858, {"count"=>2, "inspect"=>"[{\"key3\"=>\"val3\", \"time\"=>1391832858, \"tag\"=>\"test.default\"}, {\"key3_\"=>\"val3_\", \"time\"=>1391832858, \"tag\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832859, {"count"=>2, "inspect"=>"[{\"key4\"=>\"val4\", \"time\"=>1391832859, \"tag\"=>\"test.default\"}, {\"key4_\"=>\"val4_\", \"time\"=>1391832859, \"tag\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832860, {"count"=>2, "inspect"=>"[{\"key5\"=>\"val5\", \"time\"=>1391832860, \"tag\"=>\"test.default\"}, {\"key5_\"=>\"val5_\", \"time\"=>1391832860, \"tag\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832861, {"count"=>2, "inspect"=>"[{\"key6\"=>\"val6\", \"time\"=>1391832861, \"tag\"=>\"test.default\"}, {\"key6_\"=>\"val6_\", \"time\"=>1391832861, \"tag\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832862, {"count"=>2, "inspect"=>"[{\"key7\"=>\"val7\", \"time\"=>1391832862, \"tag\"=>\"test.default\"}, {\"key7_\"=>\"val7_\", \"time\"=>1391832862, \"tag\"=>\"test.default\"}]"}]] ) end end it 'should receive an array of Hash (Not use the default key)' do run_driver(:pass_hash_row => true, :hash_row_time_key => '__TIME__', :hash_row_tag_key => '__TAG__') do |d| (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832855, {"count"=>2, "inspect"=>"[{\"key0\"=>\"val0\", \"__TIME__\"=>1391832855, \"__TAG__\"=>\"test.default\"}, {\"key0_\"=>\"val0_\", \"__TIME__\"=>1391832855, \"__TAG__\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832856, {"count"=>2, "inspect"=>"[{\"key1\"=>\"val1\", \"__TIME__\"=>1391832856, \"__TAG__\"=>\"test.default\"}, {\"key1_\"=>\"val1_\", \"__TIME__\"=>1391832856, \"__TAG__\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832857, {"count"=>2, "inspect"=>"[{\"key2\"=>\"val2\", \"__TIME__\"=>1391832857, \"__TAG__\"=>\"test.default\"}, {\"key2_\"=>\"val2_\", \"__TIME__\"=>1391832857, \"__TAG__\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832858, {"count"=>2, "inspect"=>"[{\"key3\"=>\"val3\", \"__TIME__\"=>1391832858, \"__TAG__\"=>\"test.default\"}, {\"key3_\"=>\"val3_\", \"__TIME__\"=>1391832858, \"__TAG__\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832859, {"count"=>2, "inspect"=>"[{\"key4\"=>\"val4\", \"__TIME__\"=>1391832859, \"__TAG__\"=>\"test.default\"}, {\"key4_\"=>\"val4_\", \"__TIME__\"=>1391832859, \"__TAG__\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832860, {"count"=>2, "inspect"=>"[{\"key5\"=>\"val5\", \"__TIME__\"=>1391832860, \"__TAG__\"=>\"test.default\"}, {\"key5_\"=>\"val5_\", \"__TIME__\"=>1391832860, \"__TAG__\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832861, {"count"=>2, "inspect"=>"[{\"key6\"=>\"val6\", \"__TIME__\"=>1391832861, \"__TAG__\"=>\"test.default\"}, {\"key6_\"=>\"val6_\", \"__TIME__\"=>1391832861, \"__TAG__\"=>\"test.default\"}]"}], ["filtered.test.default", 1391832862, {"count"=>2, "inspect"=>"[{\"key7\"=>\"val7\", \"__TIME__\"=>1391832862, \"__TAG__\"=>\"test.default\"}, {\"key7_\"=>\"val7_\", \"__TIME__\"=>1391832862, \"__TAG__\"=>\"test.default\"}]"}]] ) end end end describe 'when an error happened' do it 'filter name should be included in the error' do filter = <<-EOS proc {|rs| raise 'Any error message' } EOS run_driver(:filter => filter, :tempfile => 'any_filter.rb') do |d| d.instance.log.should_receive(:error) {|e| expect(e.message).to eq('Any error message') } begin (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end rescue => e expect(e.message).to eq('Any error message') expect(File.basename(e.backtrace.first)).to be =~ /\Aany_filter\.rb/ end end end end describe 'when an invalid record was included' do it 'should skip the bad records (Filter returns a Hash)' do filter = <<-EOS proc {|rs| if rs.any? {|i| i[2].has_key?('return_nil')} nil else { 'count' => rs.count, 'inspect' => rs.inspect } end } EOS run_driver(:filter => filter) do |d| d.instance.log.should_receive(:warn) {|msg| expect(msg).to eq('Record must be Hash: nil (NilClass)') } (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end d.emit({"return_nil" => 1}, time + 10) (10...20).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832855, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.default\", 1391832855, {\"key0_\"=>\"val0_\"}]]"}], ["filtered.test.default", 1391832856, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.default\", 1391832856, {\"key1_\"=>\"val1_\"}]]"}], ["filtered.test.default", 1391832857, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.default\", 1391832857, {\"key2_\"=>\"val2_\"}]]"}], ["filtered.test.default", 1391832858, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.default\", 1391832858, {\"key3_\"=>\"val3_\"}]]"}], ["filtered.test.default", 1391832859, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.default\", 1391832859, {\"key4_\"=>\"val4_\"}]]"}], ["filtered.test.default", 1391832860, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.default\", 1391832860, {\"key5_\"=>\"val5_\"}]]"}], ["filtered.test.default", 1391832861, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.default\", 1391832861, {\"key6_\"=>\"val6_\"}]]"}], ["filtered.test.default", 1391832862, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.default\", 1391832862, {\"key7_\"=>\"val7_\"}]]"}], ["filtered.test.default", 1391832863, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832863, {\"key8\"=>\"val8\"}], [\"test.default\", 1391832863, {\"key8_\"=>\"val8_\"}]]"}], ["filtered.test.default", 1391832864, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832864, {\"key9\"=>\"val9\"}], [\"test.default\", 1391832864, {\"key9_\"=>\"val9_\"}]]"}], ["filtered.test.default", 1391832866, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832866, {\"key11\"=>\"val11\"}], [\"test.default\", 1391832866, {\"key11_\"=>\"val11_\"}]]"}], ["filtered.test.default", 1391832867, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832867, {\"key12\"=>\"val12\"}], [\"test.default\", 1391832867, {\"key12_\"=>\"val12_\"}]]"}], ["filtered.test.default", 1391832868, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832868, {\"key13\"=>\"val13\"}], [\"test.default\", 1391832868, {\"key13_\"=>\"val13_\"}]]"}], ["filtered.test.default", 1391832869, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832869, {\"key14\"=>\"val14\"}], [\"test.default\", 1391832869, {\"key14_\"=>\"val14_\"}]]"}], ["filtered.test.default", 1391832870, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832870, {\"key15\"=>\"val15\"}], [\"test.default\", 1391832870, {\"key15_\"=>\"val15_\"}]]"}], ["filtered.test.default", 1391832871, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832871, {\"key16\"=>\"val16\"}], [\"test.default\", 1391832871, {\"key16_\"=>\"val16_\"}]]"}], ["filtered.test.default", 1391832872, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832872, {\"key17\"=>\"val17\"}], [\"test.default\", 1391832872, {\"key17_\"=>\"val17_\"}]]"}]] ) end end it 'should skip the bad records (Filter returns an Array)' do filter = <<-EOS proc {|rs| if rs.any? {|i| i[2].has_key?('return_nil')} [nil, 1] else { 'count' => rs.count, 'inspect' => rs.inspect } end } EOS run_driver(:filter => filter) do |d| d.instance.log.should_receive(:warn) {|msg| expect(msg).to eq('Record must be Hash: nil (NilClass)') } d.instance.log.should_receive(:warn) {|msg| expect(msg).to eq('Record must be Hash: 1 (Fixnum)') } (0...10).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end d.emit({"return_nil" => 1}, time + 10) (10...20).each do |i| d.emit({"key#{i}" => "val#{i}"}, time + i) d.emit({"key#{i}_" => "val#{i}_"}, time + i) end expect(d.emits).to eq( [["filtered.test.default", 1391832855, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832855, {\"key0\"=>\"val0\"}], [\"test.default\", 1391832855, {\"key0_\"=>\"val0_\"}]]"}], ["filtered.test.default", 1391832856, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832856, {\"key1\"=>\"val1\"}], [\"test.default\", 1391832856, {\"key1_\"=>\"val1_\"}]]"}], ["filtered.test.default", 1391832857, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832857, {\"key2\"=>\"val2\"}], [\"test.default\", 1391832857, {\"key2_\"=>\"val2_\"}]]"}], ["filtered.test.default", 1391832858, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832858, {\"key3\"=>\"val3\"}], [\"test.default\", 1391832858, {\"key3_\"=>\"val3_\"}]]"}], ["filtered.test.default", 1391832859, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832859, {\"key4\"=>\"val4\"}], [\"test.default\", 1391832859, {\"key4_\"=>\"val4_\"}]]"}], ["filtered.test.default", 1391832860, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832860, {\"key5\"=>\"val5\"}], [\"test.default\", 1391832860, {\"key5_\"=>\"val5_\"}]]"}], ["filtered.test.default", 1391832861, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832861, {\"key6\"=>\"val6\"}], [\"test.default\", 1391832861, {\"key6_\"=>\"val6_\"}]]"}], ["filtered.test.default", 1391832862, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832862, {\"key7\"=>\"val7\"}], [\"test.default\", 1391832862, {\"key7_\"=>\"val7_\"}]]"}], ["filtered.test.default", 1391832863, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832863, {\"key8\"=>\"val8\"}], [\"test.default\", 1391832863, {\"key8_\"=>\"val8_\"}]]"}], ["filtered.test.default", 1391832864, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832864, {\"key9\"=>\"val9\"}], [\"test.default\", 1391832864, {\"key9_\"=>\"val9_\"}]]"}], ["filtered.test.default", 1391832866, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832866, {\"key11\"=>\"val11\"}], [\"test.default\", 1391832866, {\"key11_\"=>\"val11_\"}]]"}], ["filtered.test.default", 1391832867, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832867, {\"key12\"=>\"val12\"}], [\"test.default\", 1391832867, {\"key12_\"=>\"val12_\"}]]"}], ["filtered.test.default", 1391832868, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832868, {\"key13\"=>\"val13\"}], [\"test.default\", 1391832868, {\"key13_\"=>\"val13_\"}]]"}], ["filtered.test.default", 1391832869, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832869, {\"key14\"=>\"val14\"}], [\"test.default\", 1391832869, {\"key14_\"=>\"val14_\"}]]"}], ["filtered.test.default", 1391832870, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832870, {\"key15\"=>\"val15\"}], [\"test.default\", 1391832870, {\"key15_\"=>\"val15_\"}]]"}], ["filtered.test.default", 1391832871, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832871, {\"key16\"=>\"val16\"}], [\"test.default\", 1391832871, {\"key16_\"=>\"val16_\"}]]"}], ["filtered.test.default", 1391832872, {"count"=>2, "inspect"=>"[[\"test.default\", 1391832872, {\"key17\"=>\"val17\"}], [\"test.default\", 1391832872, {\"key17_\"=>\"val17_\"}]]"}]] ) end end end end