spec/inputs/jdbc_spec.rb in logstash-input-jdbc-4.3.4 vs spec/inputs/jdbc_spec.rb in logstash-input-jdbc-4.3.5

- old
+ new

@@ -36,17 +36,19 @@ Integer :num String :string DateTime :custom_time end db << "CREATE TABLE types_table (num INTEGER, string VARCHAR(255), started_at DATE, custom_time TIMESTAMP, ranking DECIMAL(16,6))" + db << "CREATE TABLE test1_table (num INTEGER, string VARCHAR(255), custom_time TIMESTAMP, created_at TIMESTAMP)" end end after :each do if !RSpec.current_example.metadata[:no_connection] db.drop_table(:test_table) db.drop_table(:types_table) + db.drop_table(:test1_table) end end context "when registering and tearing down" do let(:settings) { {"statement" => "SELECT 1 as col1 FROM test_table"} } @@ -320,65 +322,148 @@ event = queue.pop expect(event.get("custom_time")).to be_a(LogStash::Timestamp) end end - context "when fetching time data with jdbc_default_timezone set" do + describe "when jdbc_default_timezone is set" do let(:mixin_settings) do { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true", "jdbc_default_timezone" => "America/Chicago" } end - let(:settings) do - { - "statement" => "SELECT * from test_table WHERE custom_time > :sql_last_value", - "use_column_value" => true, - "tracking_column" => "custom_time", - "last_run_metadata_path" => Stud::Temporary.pathname - } + let(:hours) { [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] } + + context "when fetching time data and the tracking column is set and tracking column type defaults to 'numeric'" do + let(:settings) do + { + "statement" => "SELECT * from test_table WHERE num > :sql_last_value", + "last_run_metadata_path" => Stud::Temporary.pathname, + "tracking_column" => "num", + "use_column_value" => true + } + end + + it "should convert the time to reflect the timezone " do + File.write(settings["last_run_metadata_path"], YAML.dump(42)) + + db[:test_table].insert(:num => 42, :custom_time => "2015-01-01 10:10:10", :created_at => Time.now.utc) + db[:test_table].insert(:num => 43, :custom_time => "2015-01-01 11:11:11", :created_at => Time.now.utc) + + plugin.register + plugin.run(queue) + plugin.stop + expect(queue.size).to eq(1) + event = queue.pop + expect(event.get("num")).to eq(43) + expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-01T17:11:11.000Z")) + end end - let(:hour_range) { 10..20 } + context "when fetching time data and the tracking column is NOT set, sql_last_value is time of run" do - it "should convert the time to reflect the timezone " do - last_run_value = Time.iso8601("2000-01-01T00:00:00.000Z") - File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + let(:settings) do + { + "statement" => "SELECT * from test_table WHERE custom_time > :sql_last_value", + "last_run_metadata_path" => Stud::Temporary.pathname + } + end - hour_range.each do |i| - db[:test_table].insert(:num => i, :custom_time => "2015-01-01 #{i}:00:00", :created_at => Time.now.utc) + it "should convert the time to reflect the timezone " do + last_run_value = DateTime.iso8601("2000-01-01T12:00:00.000Z") + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + + Timecop.travel(DateTime.iso8601("2015-01-01T15:50:00.000Z")) do + # simulate earlier records written + hours.each do |i| + db[:test_table].insert(:num => i, :custom_time => "2015-01-01 #{i}:00:00", :created_at => Time.now.utc) + end + end + + Timecop.travel(DateTime.iso8601("2015-01-02T02:10:00.000Z")) do + # simulate the first plugin run after the custom time of the last record + plugin.register + plugin.run(queue) + expected = hours.map{|hour| Time.iso8601("2015-01-01T06:00:00.000Z") + (hour * 3600) }# because Sequel converts the column values to Time instances. + actual = queue.size.times.map { queue.pop.get("custom_time").time } + expect(actual).to eq(expected) + plugin.stop + end + Timecop.travel(DateTime.iso8601("2015-01-02T02:20:00.000Z")) do + # simulate a run 10 minutes later + plugin.run(queue) + expect(queue.size).to eq(0) # no new records + plugin.stop + # now add records + db[:test_table].insert(:num => 11, :custom_time => "2015-01-01 20:20:20", :created_at => Time.now.utc) + db[:test_table].insert(:num => 12, :custom_time => "2015-01-01 21:21:21", :created_at => Time.now.utc) + end + Timecop.travel(DateTime.iso8601("2015-01-02T03:30:00.000Z")) do + # simulate another run later than the custom time of the last record + plugin.run(queue) + expect(queue.size).to eq(2) + plugin.stop + end + event = queue.pop + expect(event.get("num")).to eq(11) + expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T02:20:20.000Z")) + event = queue.pop + expect(event.get("num")).to eq(12) + expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T03:21:21.000Z")) end + end - plugin.register + context "when fetching time data and the tracking column is set, sql_last_value is sourced from a column, sub-second precision is maintained" do + let(:settings) do + { + "statement" => "SELECT * from test1_table WHERE custom_time > :sql_last_value ORDER BY custom_time", + "use_column_value" => true, + "tracking_column" => "custom_time", + "tracking_column_type" => "timestamp", + "last_run_metadata_path" => Stud::Temporary.pathname + } + end - plugin.run(queue) - expected = ["2015-01-01T16:00:00.000Z", - "2015-01-01T17:00:00.000Z", - "2015-01-01T18:00:00.000Z", - "2015-01-01T19:00:00.000Z", - "2015-01-01T20:00:00.000Z", - "2015-01-01T21:00:00.000Z", - "2015-01-01T22:00:00.000Z", - "2015-01-01T23:00:00.000Z", - "2015-01-02T00:00:00.000Z", - "2015-01-02T01:00:00.000Z", - "2015-01-02T02:00:00.000Z"].map { |i| Time.iso8601(i) } - actual = queue.size.times.map { queue.pop.get("custom_time").time } - expect(actual).to eq(expected) - plugin.stop + let(:msecs) { [111, 122, 233, 244, 355, 366, 477, 488, 599, 611, 722] } - plugin.run(queue) - expect(queue.size).to eq(0) - db[:test_table].insert(:num => 11, :custom_time => "2015-01-01 11:00:00", :created_at => Time.now.utc) - db[:test_table].insert(:num => 12, :custom_time => "2015-01-01 21:00:00", :created_at => Time.now.utc) - plugin.run(queue) - expect(queue.size).to eq(1) - event = queue.pop - expect(event.get("num")).to eq(12) - expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T03:00:00.000Z")) - p settings + it "should convert the time to reflect the timezone " do + # Sequel only does the *correct* timezone calc on a DateTime instance + last_run_value = DateTime.iso8601("2000-01-01T00:00:00.987Z") + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value)) + + hours.each_with_index do |i, j| + db[:test1_table].insert(:num => i, :custom_time => "2015-01-01 #{i}:00:00.#{msecs[j]}", :created_at => Time.now.utc) + end + + plugin.register + + plugin.run(queue) + expected = hours.map.with_index {|hour, i| Time.iso8601("2015-01-01T06:00:00.000Z") + (hour * 3600 + (msecs[i] / 1000.0)) } + actual = queue.size.times.map { queue.pop.get("custom_time").time } + expect(actual).to eq(expected) + plugin.stop + last_run_value = YAML.load(File.read(settings["last_run_metadata_path"])) + expect(last_run_value).to be_a(DateTime) + expect(last_run_value.strftime("%F %T.%N %Z")).to eq("2015-01-02 02:00:00.722000000 +00:00") + + plugin.run(queue) + plugin.stop + db[:test1_table].insert(:num => 11, :custom_time => "2015-01-01 11:00:00.099", :created_at => Time.now.utc) + db[:test1_table].insert(:num => 12, :custom_time => "2015-01-01 21:00:00.811", :created_at => Time.now.utc) + expect(queue.size).to eq(0) + plugin.run(queue) + expect(queue.size).to eq(1) + event = queue.pop + plugin.stop + expect(event.get("num")).to eq(12) + expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T03:00:00.811Z")) + last_run_value = YAML.load(File.read(settings["last_run_metadata_path"])) + expect(last_run_value).to be_a(DateTime) + # verify that sub-seconds are recorded to the file + expect(last_run_value.strftime("%F %T.%N %Z")).to eq("2015-01-02 03:00:00.811000000 +00:00") + end end end context "when fetching time data without jdbc_default_timezone set" do let(:mixin_settings) do @@ -477,20 +562,20 @@ it "should successfully update sql_last_value" do test_table = db[:test_table] plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(0) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(0) test_table.insert(:num => nums[0], :created_at => Time.now.utc) test_table.insert(:num => nums[1], :created_at => Time.now.utc) plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(20) test_table.insert(:num => nums[2], :created_at => Time.now.utc) test_table.insert(:num => nums[3], :created_at => Time.now.utc) test_table.insert(:num => nums[4], :created_at => Time.now.utc) plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(50) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(50) end end context "when iteratively running plugin#run with timestamp tracking column with column value" do let(:mixin_settings) do @@ -520,21 +605,21 @@ it "should successfully update sql_last_value" do test_table = db[:test_table] plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.parse("1970-01-01 00:00:00.000000000 +0000")) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse("1970-01-01 00:00:00.000000000 +0000")) test_table.insert(:num => nums[0], :created_at => Time.now.utc, :custom_time => times[0]) test_table.insert(:num => nums[1], :created_at => Time.now.utc, :custom_time => times[1]) plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value").class).to eq(Time.parse(times[0]).class) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.parse(times[1])) + expect(plugin.instance_variable_get("@value_tracker").value.class).to eq(Time.parse(times[0]).class) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse(times[1])) test_table.insert(:num => nums[2], :created_at => Time.now.utc, :custom_time => times[2]) test_table.insert(:num => nums[3], :created_at => Time.now.utc, :custom_time => times[3]) test_table.insert(:num => nums[4], :created_at => Time.now.utc, :custom_time => times[4]) plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.parse(times[4])) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse(times[4])) end end context "when iteratively running plugin#run with tracking_column and stored metadata" do let(:mixin_settings) do @@ -564,23 +649,23 @@ it "should successfully update sql_last_value and only add appropriate events" do test_table = db[:test_table] plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(20) expect(queue.length).to eq(0) # Shouldn't grab anything here. test_table.insert(:num => nums[0], :created_at => Time.now.utc) test_table.insert(:num => nums[1], :created_at => Time.now.utc) plugin.run(queue) expect(queue.length).to eq(0) # Shouldn't grab anything here either. - expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(20) test_table.insert(:num => nums[2], :created_at => Time.now.utc) test_table.insert(:num => nums[3], :created_at => Time.now.utc) test_table.insert(:num => nums[4], :created_at => Time.now.utc) plugin.run(queue) expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed. - expect(plugin.instance_variable_get("@sql_last_value")).to eq(50) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(50) end end context "when iteratively running plugin#run with BAD tracking_column and stored metadata" do let(:mixin_settings) do @@ -610,23 +695,23 @@ it "should send a warning and not update sql_last_value" do test_table = db[:test_table] plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(20) expect(queue.length).to eq(0) # Shouldn't grab anything here. test_table.insert(:num => nums[0], :created_at => Time.now.utc) test_table.insert(:num => nums[1], :created_at => Time.now.utc) plugin.run(queue) expect(queue.length).to eq(0) # Shouldn't grab anything here either. - expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(20) test_table.insert(:num => nums[2], :created_at => Time.now.utc) test_table.insert(:num => nums[3], :created_at => Time.now.utc) test_table.insert(:num => nums[4], :created_at => Time.now.utc) plugin.run(queue) expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed. - expect(plugin.instance_variable_get("@sql_last_value")).to eq(20) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(20) expect(plugin.instance_variable_get("@tracking_column_warning_sent")).to eq(true) end end context "when previous runs are to be respected upon successful query execution (by time)" do @@ -648,11 +733,11 @@ end it "should respect last run metadata" do plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to be > last_run_time + expect(plugin.instance_variable_get("@value_tracker").value).to be > last_run_time end end context "when previous runs are to be respected upon successful query execution (by column)" do @@ -675,11 +760,11 @@ end it "metadata should equal last_run_value" do plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(last_run_value) end end context "when previous runs are to be respected upon query failure (by time)" do let(:settings) do @@ -699,11 +784,11 @@ end it "should not respect last run metadata" do plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_time) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(last_run_time) end end context "when previous runs are to be respected upon query failure (by column)" do let(:settings) do @@ -726,11 +811,11 @@ end it "metadata should still reflect last value" do plugin.run(queue) - expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(last_run_value) end end context "when doing a clean run (by time)" do @@ -752,11 +837,11 @@ after do plugin.stop end it "should ignore last run metadata if :clean_run set to true" do - expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.at(0).utc) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.at(0).utc) end end context "when doing a clean run (by value)" do @@ -780,10 +865,10 @@ after do plugin.stop end it "should ignore last run metadata if :clean_run set to true" do - expect(plugin.instance_variable_get("@sql_last_value")).to eq(0) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(0) end end context "when state is not to be persisted" do