spec/inputs/file_spec.rb in logstash-input-file-2.1.3 vs spec/inputs/file_spec.rb in logstash-input-file-2.2.0
- old
+ new
@@ -150,10 +150,14 @@
let(:codec) { FileInput::CodecTracer.new }
let(:tmpfile_path) { Stud::Temporary.pathname }
let(:sincedb_path) { Stud::Temporary.pathname }
let(:tmpdir_path) { Stud::Temporary.directory }
+ after :each do
+ FileUtils.rm_rf(sincedb_path)
+ end
+
context "when data exists and then more data is appended" do
subject { described_class.new(conf) }
before do
File.open(tmpfile_path, "w") do |fd|
@@ -367,10 +371,11 @@
before do
conf.update(
"path" => tmpdir_path + "/*.log",
"start_position" => "beginning",
+ "stat_interval" => 0.1,
"sincedb_path" => sincedb_path)
File.open(file_path, "w") do |fd|
fd.puts('foo')
fd.puts('bar')
@@ -380,16 +385,100 @@
it "should only have one set of files open" do
subject.register
expect(lsof_proc.call).to eq("")
run_thread_proc.call
- sleep 0.1
+ sleep 0.25
first_lsof = lsof_proc.call
- expect(first_lsof).not_to eq("")
+ expect(first_lsof.scan(file_path).size).to eq(1)
run_thread_proc.call
- sleep 0.1
+ sleep 0.25
second_lsof = lsof_proc.call
- expect(second_lsof).to eq(first_lsof)
+ expect(second_lsof.scan(file_path).size).to eq(1)
end
+ end
+
+ describe "specifying max_open_files" do
+ subject { described_class.new(conf) }
+ before do
+ File.open("#{tmpdir_path}/a.log", "w") do |fd|
+ fd.puts("line1-of-a")
+ fd.puts("line2-of-a")
+ fd.fsync
+ end
+ File.open("#{tmpdir_path}/z.log", "w") do |fd|
+ fd.puts("line1-of-z")
+ fd.puts("line2-of-z")
+ fd.fsync
+ end
+ end
+
+ context "when close_older is NOT specified" do
+ before do
+ conf.clear
+ conf.update(
+ "type" => "blah",
+ "path" => "#{tmpdir_path}/*.log",
+ "sincedb_path" => sincedb_path,
+ "stat_interval" => 0.1,
+ "max_open_files" => 1,
+ "start_position" => "beginning",
+ "delimiter" => FILE_DELIMITER)
+ subject.register
+ Thread.new { subject.run(events) }
+ sleep 0.1
+ end
+ it "collects line events from only one file" do
+ # wait for one path to be mapped as identity
+ expect(pause_until{ subject.codec.identity_count == 1 }).to be_truthy
+ subject.stop
+ # stop flushes last event
+ expect(pause_until{ events.size == 2 }).to be_truthy
+
+ e1, e2 = events
+ if Dir.glob("#{tmpdir_path}/*.log").first =~ %r{a\.log}
+ #linux and OSX have different retrieval order
+ expect(e1["message"]).to eq("line1-of-a")
+ expect(e2["message"]).to eq("line2-of-a")
+ else
+ expect(e1["message"]).to eq("line1-of-z")
+ expect(e2["message"]).to eq("line2-of-z")
+ end
+ end
+ end
+
+ context "when close_older IS specified" do
+ before do
+ conf.update(
+ "type" => "blah",
+ "path" => "#{tmpdir_path}/*.log",
+ "sincedb_path" => sincedb_path,
+ "stat_interval" => 0.1,
+ "max_open_files" => 1,
+ "close_older" => 1,
+ "start_position" => "beginning",
+ "delimiter" => FILE_DELIMITER)
+ subject.register
+ Thread.new { subject.run(events) }
+ sleep 0.1
+ end
+
+ it "collects line events from both files" do
+ # close flushes last event of each identity
+ expect(pause_until{ events.size == 4 }).to be_truthy
+ subject.stop
+ if Dir.glob("#{tmpdir_path}/*.log").first =~ %r{a\.log}
+ #linux and OSX have different retrieval order
+ e1, e2, e3, e4 = events
+ else
+ e3, e4, e1, e2 = events
+ end
+ expect(e1["message"]).to eq("line1-of-a")
+ expect(e2["message"]).to eq("line2-of-a")
+ expect(e3["message"]).to eq("line1-of-z")
+ expect(e4["message"]).to eq("line2-of-z")
+ end
+ end
+
end
end
end