spec/inputs/file_spec.rb in logstash-input-file-2.1.3 vs spec/inputs/file_spec.rb in logstash-input-file-2.2.0

- old
+ new

@@ -150,10 +150,14 @@ let(:codec) { FileInput::CodecTracer.new } let(:tmpfile_path) { Stud::Temporary.pathname } let(:sincedb_path) { Stud::Temporary.pathname } let(:tmpdir_path) { Stud::Temporary.directory } + after :each do + FileUtils.rm_rf(sincedb_path) + end + context "when data exists and then more data is appended" do subject { described_class.new(conf) } before do File.open(tmpfile_path, "w") do |fd| @@ -367,10 +371,11 @@ before do conf.update( "path" => tmpdir_path + "/*.log", "start_position" => "beginning", + "stat_interval" => 0.1, "sincedb_path" => sincedb_path) File.open(file_path, "w") do |fd| fd.puts('foo') fd.puts('bar') @@ -380,16 +385,100 @@ it "should only have one set of files open" do subject.register expect(lsof_proc.call).to eq("") run_thread_proc.call - sleep 0.1 + sleep 0.25 first_lsof = lsof_proc.call - expect(first_lsof).not_to eq("") + expect(first_lsof.scan(file_path).size).to eq(1) run_thread_proc.call - sleep 0.1 + sleep 0.25 second_lsof = lsof_proc.call - expect(second_lsof).to eq(first_lsof) + expect(second_lsof.scan(file_path).size).to eq(1) end + end + + describe "specifying max_open_files" do + subject { described_class.new(conf) } + before do + File.open("#{tmpdir_path}/a.log", "w") do |fd| + fd.puts("line1-of-a") + fd.puts("line2-of-a") + fd.fsync + end + File.open("#{tmpdir_path}/z.log", "w") do |fd| + fd.puts("line1-of-z") + fd.puts("line2-of-z") + fd.fsync + end + end + + context "when close_older is NOT specified" do + before do + conf.clear + conf.update( + "type" => "blah", + "path" => "#{tmpdir_path}/*.log", + "sincedb_path" => sincedb_path, + "stat_interval" => 0.1, + "max_open_files" => 1, + "start_position" => "beginning", + "delimiter" => FILE_DELIMITER) + subject.register + Thread.new { subject.run(events) } + sleep 0.1 + end + it "collects line events from only one file" do + # wait for one path to be mapped as identity + expect(pause_until{ subject.codec.identity_count == 1 }).to be_truthy + subject.stop + # stop flushes last event + expect(pause_until{ events.size == 2 }).to be_truthy + + e1, e2 = events + if Dir.glob("#{tmpdir_path}/*.log").first =~ %r{a\.log} + #linux and OSX have different retrieval order + expect(e1["message"]).to eq("line1-of-a") + expect(e2["message"]).to eq("line2-of-a") + else + expect(e1["message"]).to eq("line1-of-z") + expect(e2["message"]).to eq("line2-of-z") + end + end + end + + context "when close_older IS specified" do + before do + conf.update( + "type" => "blah", + "path" => "#{tmpdir_path}/*.log", + "sincedb_path" => sincedb_path, + "stat_interval" => 0.1, + "max_open_files" => 1, + "close_older" => 1, + "start_position" => "beginning", + "delimiter" => FILE_DELIMITER) + subject.register + Thread.new { subject.run(events) } + sleep 0.1 + end + + it "collects line events from both files" do + # close flushes last event of each identity + expect(pause_until{ events.size == 4 }).to be_truthy + subject.stop + if Dir.glob("#{tmpdir_path}/*.log").first =~ %r{a\.log} + #linux and OSX have different retrieval order + e1, e2, e3, e4 = events + else + e3, e4, e1, e2 = events + end + expect(e1["message"]).to eq("line1-of-a") + expect(e2["message"]).to eq("line2-of-a") + expect(e3["message"]).to eq("line1-of-z") + expect(e4["message"]).to eq("line2-of-z") + end + end + end end end