spec/inputs/jdbc_spec.rb in logstash-input-jdbc-2.1.1 vs spec/inputs/jdbc_spec.rb in logstash-input-jdbc-3.0.0

- old
+ new

@@ -159,10 +159,44 @@ Timecop.return end end + context "when scheduling and previous runs are to be preserved" do + let(:settings) do + { + "statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1", + "schedule" => "* * * * * UTC", + "last_run_metadata_path" => Stud::Temporary.pathname + } + end + + let(:last_run_time) { Time.at(1).utc } + + before do + plugin.register + end + + it "should flush previous run metadata per query execution" do + Timecop.travel(Time.new(2000)) + Timecop.scale(60) + runner = Thread.new do + plugin.run(queue) + end + sleep 1 + for i in 0..1 + sleep 1 + updated_last_run = YAML.load(File.read(settings["last_run_metadata_path"])) + expect(updated_last_run).to be > last_run_time + last_run_time = updated_last_run + end + + plugin.stop + end + + end + context "when iterating result-set via paging" do let(:settings) do { "statement" => "SELECT * from test_table", @@ -293,11 +327,11 @@ end end context "when iteratively running plugin#run" do let(:settings) do - {"statement" => "SELECT num, created_at FROM test_table WHERE created_at > :sql_last_start"} + {"statement" => "SELECT num, created_at FROM test_table WHERE created_at > :sql_last_value"} end let(:nums) { [10, 20, 30, 40, 50] } before do @@ -327,36 +361,250 @@ expect(actual_sum).to eq(nums.inject{|sum,x| sum + x }) end end - context "when previous runs are to be respected" do + context "when iteratively running plugin#run with tracking_column" do + let(:mixin_settings) do + { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true" + } + end let(:settings) do - { "statement" => "SELECT * FROM test_table", + { "statement" => "SELECT num, created_at FROM test_table WHERE num > :sql_last_value", + "use_column_value" => true, + "tracking_column" => "num", "last_run_metadata_path" => Stud::Temporary.pathname } end - let(:last_run_time) { Time.at(1).utc } + let(:nums) { [10, 20, 30, 40, 50] } before do + plugin.register + end + + after do + plugin.stop + end + + it "should successfully update sql_last_value" do + test_table = db[:test_table] + + plugin.run(queue) + expect(plugin.instance_variable_get("@sql_last_value")).to eq(0) + test_table.insert(:num => nums[0], :created_at => Time.now.utc) + test_table.insert(:num => nums[1], :created_at => Time.now.utc) + plugin.run(queue) + expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + test_table.insert(:num => nums[2], :created_at => Time.now.utc) + test_table.insert(:num => nums[3], :created_at => Time.now.utc) + test_table.insert(:num => nums[4], :created_at => Time.now.utc) + plugin.run(queue) + expect(plugin.instance_variable_get("@sql_last_value")).to eq(50) + end + end + + context "when iteratively running plugin#run with tracking_column and stored metadata" do + let(:mixin_settings) do + { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true" + } + end + + let(:settings) do + { "statement" => "SELECT num, created_at FROM test_table WHERE num > :sql_last_value", + "use_column_value" => true, + "tracking_column" => "num", + "last_run_metadata_path" => Stud::Temporary.pathname } + end + + let(:nums) { [10, 20, 30, 40, 50] } + let(:last_run_value) { 20 } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + plugin.register + end + + after do + plugin.stop + end + + it "should successfully update sql_last_value and only add appropriate events" do + test_table = db[:test_table] + + plugin.run(queue) + expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(queue.length).to eq(0) # Shouldn't grab anything here. + test_table.insert(:num => nums[0], :created_at => Time.now.utc) + test_table.insert(:num => nums[1], :created_at => Time.now.utc) + plugin.run(queue) + expect(queue.length).to eq(0) # Shouldn't grab anything here either. + expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + test_table.insert(:num => nums[2], :created_at => Time.now.utc) + test_table.insert(:num => nums[3], :created_at => Time.now.utc) + test_table.insert(:num => nums[4], :created_at => Time.now.utc) + plugin.run(queue) + expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed. + expect(plugin.instance_variable_get("@sql_last_value")).to eq(50) + end + end + + context "when iteratively running plugin#run with BAD tracking_column and stored metadata" do + let(:mixin_settings) do + { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true" + } + end + + let(:settings) do + { "statement" => "SELECT num, created_at FROM test_table WHERE num > :sql_last_value", + "use_column_value" => true, + "tracking_column" => "not_num", + "last_run_metadata_path" => Stud::Temporary.pathname } + end + + let(:nums) { [10, 20, 30, 40, 50] } + let(:last_run_value) { 20 } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + plugin.register + end + + after do + plugin.stop + end + + it "should send a warning and not update sql_last_value" do + test_table = db[:test_table] + + plugin.run(queue) + expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(queue.length).to eq(0) # Shouldn't grab anything here. + test_table.insert(:num => nums[0], :created_at => Time.now.utc) + test_table.insert(:num => nums[1], :created_at => Time.now.utc) + plugin.run(queue) + expect(queue.length).to eq(0) # Shouldn't grab anything here either. + expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + test_table.insert(:num => nums[2], :created_at => Time.now.utc) + test_table.insert(:num => nums[3], :created_at => Time.now.utc) + test_table.insert(:num => nums[4], :created_at => Time.now.utc) + plugin.run(queue) + expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed. + expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(plugin.instance_variable_get("@tracking_column_warning_sent")).to eq(true) + end + end + + context "when previous runs are to be respected upon successful query execution (by time)" do + + let(:settings) do + { "statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1", + "last_run_metadata_path" => Stud::Temporary.pathname } + end + + let(:last_run_time) { Time.now.utc } + + before do File.write(settings["last_run_metadata_path"], YAML.dump(last_run_time)) plugin.register end after do plugin.stop end it "should respect last run metadata" do - expect(plugin.instance_variable_get("@sql_last_start")).to eq(last_run_time) + plugin.run(queue) + + expect(plugin.instance_variable_get("@sql_last_value")).to be > last_run_time end end - context "when doing a clean run" do + context "when previous runs are to be respected upon successful query execution (by column)" do let(:settings) do + { "statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1", + "use_column_value" => true, + "tracking_column" => "num_param", + "last_run_metadata_path" => Stud::Temporary.pathname } + end + + let(:last_run_value) { 1 } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + plugin.register + end + + after do + plugin.stop + end + + it "metadata should equal last_run_value" do + plugin.run(queue) + + expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value) + end + end + + context "when previous runs are to be respected upon query failure (by time)" do + let(:settings) do + { "statement" => "SELECT col from non_existent_table", + "last_run_metadata_path" => Stud::Temporary.pathname } + end + + let(:last_run_time) { Time.now.utc } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_time)) + plugin.register + end + + after do + plugin.stop + end + + it "should not respect last run metadata" do + plugin.run(queue) + + expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_time) + end + end + + context "when previous runs are to be respected upon query failure (by column)" do + let(:settings) do + { "statement" => "SELECT col from non_existent_table", + "use_column_value" => true, + "tracking_column" => "num_param", + "last_run_metadata_path" => Stud::Temporary.pathname + } + end + + let(:last_run_value) { 1 } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + plugin.register + end + + after do + plugin.stop + end + + it "metadata should still reflect last value" do + plugin.run(queue) + + expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value) + end + end + + context "when doing a clean run (by time)" do + + let(:settings) do { "statement" => "SELECT * FROM test_table", "last_run_metadata_path" => Stud::Temporary.pathname, "clean_run" => true } @@ -372,12 +620,41 @@ after do plugin.stop end it "should ignore last run metadata if :clean_run set to true" do - expect(plugin.instance_variable_get("@sql_last_start")).to eq(Time.at(0).utc) + expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.at(0).utc) end end + + context "when doing a clean run (by value)" do + + let(:settings) do + { + "statement" => "SELECT * FROM test_table", + "last_run_metadata_path" => Stud::Temporary.pathname, + "use_column_value" => true, + "tracking_column" => "num_param", + "clean_run" => true + } + end + + let(:last_run_value) { 1000 } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + plugin.register + end + + after do + plugin.stop + end + + it "should ignore last run metadata if :clean_run set to true" do + expect(plugin.instance_variable_get("@sql_last_value")).to eq(0) + end + end + context "when state is not to be persisted" do let(:settings) do { "statement" => "SELECT * FROM test_table",