spec/inputs/jdbc_spec.rb in logstash-input-jdbc-2.1.1 vs spec/inputs/jdbc_spec.rb in logstash-input-jdbc-3.0.0
- old
+ new
@@ -159,10 +159,44 @@
Timecop.return
end
end
+ context "when scheduling and previous runs are to be preserved" do
+ let(:settings) do
+ {
+ "statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1",
+ "schedule" => "* * * * * UTC",
+ "last_run_metadata_path" => Stud::Temporary.pathname
+ }
+ end
+
+ let(:last_run_time) { Time.at(1).utc }
+
+ before do
+ plugin.register
+ end
+
+ it "should flush previous run metadata per query execution" do
+ Timecop.travel(Time.new(2000))
+ Timecop.scale(60)
+ runner = Thread.new do
+ plugin.run(queue)
+ end
+ sleep 1
+ for i in 0..1
+ sleep 1
+ updated_last_run = YAML.load(File.read(settings["last_run_metadata_path"]))
+ expect(updated_last_run).to be > last_run_time
+ last_run_time = updated_last_run
+ end
+
+ plugin.stop
+ end
+
+ end
+
context "when iterating result-set via paging" do
let(:settings) do
{
"statement" => "SELECT * from test_table",
@@ -293,11 +327,11 @@
end
end
context "when iteratively running plugin#run" do
let(:settings) do
- {"statement" => "SELECT num, created_at FROM test_table WHERE created_at > :sql_last_start"}
+ {"statement" => "SELECT num, created_at FROM test_table WHERE created_at > :sql_last_value"}
end
let(:nums) { [10, 20, 30, 40, 50] }
before do
@@ -327,36 +361,250 @@
expect(actual_sum).to eq(nums.inject{|sum,x| sum + x })
end
end
- context "when previous runs are to be respected" do
+ context "when iteratively running plugin#run with tracking_column" do
+ let(:mixin_settings) do
+ { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
+ "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"
+ }
+ end
let(:settings) do
- { "statement" => "SELECT * FROM test_table",
+ { "statement" => "SELECT num, created_at FROM test_table WHERE num > :sql_last_value",
+ "use_column_value" => true,
+ "tracking_column" => "num",
"last_run_metadata_path" => Stud::Temporary.pathname }
end
- let(:last_run_time) { Time.at(1).utc }
+ let(:nums) { [10, 20, 30, 40, 50] }
before do
+ plugin.register
+ end
+
+ after do
+ plugin.stop
+ end
+
+ it "should successfully update sql_last_value" do
+ test_table = db[:test_table]
+
+ plugin.run(queue)
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(0)
+ test_table.insert(:num => nums[0], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[1], :created_at => Time.now.utc)
+ plugin.run(queue)
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ test_table.insert(:num => nums[2], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[3], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[4], :created_at => Time.now.utc)
+ plugin.run(queue)
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(50)
+ end
+ end
+
+ context "when iteratively running plugin#run with tracking_column and stored metadata" do
+ let(:mixin_settings) do
+ { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
+ "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"
+ }
+ end
+
+ let(:settings) do
+ { "statement" => "SELECT num, created_at FROM test_table WHERE num > :sql_last_value",
+ "use_column_value" => true,
+ "tracking_column" => "num",
+ "last_run_metadata_path" => Stud::Temporary.pathname }
+ end
+
+ let(:nums) { [10, 20, 30, 40, 50] }
+ let(:last_run_value) { 20 }
+
+ before do
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+ plugin.register
+ end
+
+ after do
+ plugin.stop
+ end
+
+ it "should successfully update sql_last_value and only add appropriate events" do
+ test_table = db[:test_table]
+
+ plugin.run(queue)
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(queue.length).to eq(0) # Shouldn't grab anything here.
+ test_table.insert(:num => nums[0], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[1], :created_at => Time.now.utc)
+ plugin.run(queue)
+ expect(queue.length).to eq(0) # Shouldn't grab anything here either.
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ test_table.insert(:num => nums[2], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[3], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[4], :created_at => Time.now.utc)
+ plugin.run(queue)
+ expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed.
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(50)
+ end
+ end
+
+ context "when iteratively running plugin#run with BAD tracking_column and stored metadata" do
+ let(:mixin_settings) do
+ { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
+ "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"
+ }
+ end
+
+ let(:settings) do
+ { "statement" => "SELECT num, created_at FROM test_table WHERE num > :sql_last_value",
+ "use_column_value" => true,
+ "tracking_column" => "not_num",
+ "last_run_metadata_path" => Stud::Temporary.pathname }
+ end
+
+ let(:nums) { [10, 20, 30, 40, 50] }
+ let(:last_run_value) { 20 }
+
+ before do
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+ plugin.register
+ end
+
+ after do
+ plugin.stop
+ end
+
+ it "should send a warning and not update sql_last_value" do
+ test_table = db[:test_table]
+
+ plugin.run(queue)
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(queue.length).to eq(0) # Shouldn't grab anything here.
+ test_table.insert(:num => nums[0], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[1], :created_at => Time.now.utc)
+ plugin.run(queue)
+ expect(queue.length).to eq(0) # Shouldn't grab anything here either.
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ test_table.insert(:num => nums[2], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[3], :created_at => Time.now.utc)
+ test_table.insert(:num => nums[4], :created_at => Time.now.utc)
+ plugin.run(queue)
+ expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed.
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(plugin.instance_variable_get("@tracking_column_warning_sent")).to eq(true)
+ end
+ end
+
+ context "when previous runs are to be respected upon successful query execution (by time)" do
+
+ let(:settings) do
+ { "statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1",
+ "last_run_metadata_path" => Stud::Temporary.pathname }
+ end
+
+ let(:last_run_time) { Time.now.utc }
+
+ before do
File.write(settings["last_run_metadata_path"], YAML.dump(last_run_time))
plugin.register
end
after do
plugin.stop
end
it "should respect last run metadata" do
- expect(plugin.instance_variable_get("@sql_last_start")).to eq(last_run_time)
+ plugin.run(queue)
+
+ expect(plugin.instance_variable_get("@sql_last_value")).to be > last_run_time
end
end
- context "when doing a clean run" do
+ context "when previous runs are to be respected upon successful query execution (by column)" do
let(:settings) do
+ { "statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1",
+ "use_column_value" => true,
+ "tracking_column" => "num_param",
+ "last_run_metadata_path" => Stud::Temporary.pathname }
+ end
+
+ let(:last_run_value) { 1 }
+
+ before do
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+ plugin.register
+ end
+
+ after do
+ plugin.stop
+ end
+
+ it "metadata should equal last_run_value" do
+ plugin.run(queue)
+
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value)
+ end
+ end
+
+ context "when previous runs are to be respected upon query failure (by time)" do
+ let(:settings) do
+ { "statement" => "SELECT col from non_existent_table",
+ "last_run_metadata_path" => Stud::Temporary.pathname }
+ end
+
+ let(:last_run_time) { Time.now.utc }
+
+ before do
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_time))
+ plugin.register
+ end
+
+ after do
+ plugin.stop
+ end
+
+ it "should not respect last run metadata" do
+ plugin.run(queue)
+
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_time)
+ end
+ end
+
+ context "when previous runs are to be respected upon query failure (by column)" do
+ let(:settings) do
+ { "statement" => "SELECT col from non_existent_table",
+ "use_column_value" => true,
+ "tracking_column" => "num_param",
+ "last_run_metadata_path" => Stud::Temporary.pathname
+ }
+ end
+
+ let(:last_run_value) { 1 }
+
+ before do
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+ plugin.register
+ end
+
+ after do
+ plugin.stop
+ end
+
+ it "metadata should still reflect last value" do
+ plugin.run(queue)
+
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value)
+ end
+ end
+
+ context "when doing a clean run (by time)" do
+
+ let(:settings) do
{
"statement" => "SELECT * FROM test_table",
"last_run_metadata_path" => Stud::Temporary.pathname,
"clean_run" => true
}
@@ -372,12 +620,41 @@
after do
plugin.stop
end
it "should ignore last run metadata if :clean_run set to true" do
- expect(plugin.instance_variable_get("@sql_last_start")).to eq(Time.at(0).utc)
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.at(0).utc)
end
end
+
+ context "when doing a clean run (by value)" do
+
+ let(:settings) do
+ {
+ "statement" => "SELECT * FROM test_table",
+ "last_run_metadata_path" => Stud::Temporary.pathname,
+ "use_column_value" => true,
+ "tracking_column" => "num_param",
+ "clean_run" => true
+ }
+ end
+
+ let(:last_run_value) { 1000 }
+
+ before do
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+ plugin.register
+ end
+
+ after do
+ plugin.stop
+ end
+
+ it "should ignore last run metadata if :clean_run set to true" do
+ expect(plugin.instance_variable_get("@sql_last_value")).to eq(0)
+ end
+ end
+
context "when state is not to be persisted" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table",