spec/inputs/jdbc_spec.rb in logstash-input-jdbc-4.3.4 vs spec/inputs/jdbc_spec.rb in logstash-input-jdbc-4.3.5
- old
+ new
@@ -36,17 +36,19 @@
Integer :num
String :string
DateTime :custom_time
end
db << "CREATE TABLE types_table (num INTEGER, string VARCHAR(255), started_at DATE, custom_time TIMESTAMP, ranking DECIMAL(16,6))"
+ db << "CREATE TABLE test1_table (num INTEGER, string VARCHAR(255), custom_time TIMESTAMP, created_at TIMESTAMP)"
end
end
after :each do
if !RSpec.current_example.metadata[:no_connection]
db.drop_table(:test_table)
db.drop_table(:types_table)
+ db.drop_table(:test1_table)
end
end
context "when registering and tearing down" do
let(:settings) { {"statement" => "SELECT 1 as col1 FROM test_table"} }
@@ -320,65 +322,148 @@
event = queue.pop
expect(event.get("custom_time")).to be_a(LogStash::Timestamp)
end
end
- context "when fetching time data with jdbc_default_timezone set" do
+ describe "when jdbc_default_timezone is set" do
let(:mixin_settings) do
{ "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true",
"jdbc_default_timezone" => "America/Chicago"
}
end
- let(:settings) do
- {
- "statement" => "SELECT * from test_table WHERE custom_time > :sql_last_value",
- "use_column_value" => true,
- "tracking_column" => "custom_time",
- "last_run_metadata_path" => Stud::Temporary.pathname
- }
+ let(:hours) { [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] }
+
+ context "when fetching time data and the tracking column is set and tracking column type defaults to 'numeric'" do
+ let(:settings) do
+ {
+ "statement" => "SELECT * from test_table WHERE num > :sql_last_value",
+ "last_run_metadata_path" => Stud::Temporary.pathname,
+ "tracking_column" => "num",
+ "use_column_value" => true
+ }
+ end
+
+ it "should convert the time to reflect the timezone " do
+ File.write(settings["last_run_metadata_path"], YAML.dump(42))
+
+ db[:test_table].insert(:num => 42, :custom_time => "2015-01-01 10:10:10", :created_at => Time.now.utc)
+ db[:test_table].insert(:num => 43, :custom_time => "2015-01-01 11:11:11", :created_at => Time.now.utc)
+
+ plugin.register
+ plugin.run(queue)
+ plugin.stop
+ expect(queue.size).to eq(1)
+ event = queue.pop
+ expect(event.get("num")).to eq(43)
+ expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-01T17:11:11.000Z"))
+ end
end
- let(:hour_range) { 10..20 }
+ context "when fetching time data and the tracking column is NOT set, sql_last_value is time of run" do
- it "should convert the time to reflect the timezone " do
- last_run_value = Time.iso8601("2000-01-01T00:00:00.000Z")
- File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+ let(:settings) do
+ {
+ "statement" => "SELECT * from test_table WHERE custom_time > :sql_last_value",
+ "last_run_metadata_path" => Stud::Temporary.pathname
+ }
+ end
- hour_range.each do |i|
- db[:test_table].insert(:num => i, :custom_time => "2015-01-01 #{i}:00:00", :created_at => Time.now.utc)
+ it "should convert the time to reflect the timezone " do
+ last_run_value = DateTime.iso8601("2000-01-01T12:00:00.000Z")
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+
+ Timecop.travel(DateTime.iso8601("2015-01-01T15:50:00.000Z")) do
+ # simulate earlier records written
+ hours.each do |i|
+ db[:test_table].insert(:num => i, :custom_time => "2015-01-01 #{i}:00:00", :created_at => Time.now.utc)
+ end
+ end
+
+ Timecop.travel(DateTime.iso8601("2015-01-02T02:10:00.000Z")) do
+ # simulate the first plugin run after the custom time of the last record
+ plugin.register
+ plugin.run(queue)
+ expected = hours.map{|hour| Time.iso8601("2015-01-01T06:00:00.000Z") + (hour * 3600) }# because Sequel converts the column values to Time instances.
+ actual = queue.size.times.map { queue.pop.get("custom_time").time }
+ expect(actual).to eq(expected)
+ plugin.stop
+ end
+ Timecop.travel(DateTime.iso8601("2015-01-02T02:20:00.000Z")) do
+ # simulate a run 10 minutes later
+ plugin.run(queue)
+ expect(queue.size).to eq(0) # no new records
+ plugin.stop
+ # now add records
+ db[:test_table].insert(:num => 11, :custom_time => "2015-01-01 20:20:20", :created_at => Time.now.utc)
+ db[:test_table].insert(:num => 12, :custom_time => "2015-01-01 21:21:21", :created_at => Time.now.utc)
+ end
+ Timecop.travel(DateTime.iso8601("2015-01-02T03:30:00.000Z")) do
+ # simulate another run later than the custom time of the last record
+ plugin.run(queue)
+ expect(queue.size).to eq(2)
+ plugin.stop
+ end
+ event = queue.pop
+ expect(event.get("num")).to eq(11)
+ expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T02:20:20.000Z"))
+ event = queue.pop
+ expect(event.get("num")).to eq(12)
+ expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T03:21:21.000Z"))
end
+ end
- plugin.register
+ context "when fetching time data and the tracking column is set, sql_last_value is sourced from a column, sub-second precision is maintained" do
+ let(:settings) do
+ {
+ "statement" => "SELECT * from test1_table WHERE custom_time > :sql_last_value ORDER BY custom_time",
+ "use_column_value" => true,
+ "tracking_column" => "custom_time",
+ "tracking_column_type" => "timestamp",
+ "last_run_metadata_path" => Stud::Temporary.pathname
+ }
+ end
- plugin.run(queue)
- expected = ["2015-01-01T16:00:00.000Z",
- "2015-01-01T17:00:00.000Z",
- "2015-01-01T18:00:00.000Z",
- "2015-01-01T19:00:00.000Z",
- "2015-01-01T20:00:00.000Z",
- "2015-01-01T21:00:00.000Z",
- "2015-01-01T22:00:00.000Z",
- "2015-01-01T23:00:00.000Z",
- "2015-01-02T00:00:00.000Z",
- "2015-01-02T01:00:00.000Z",
- "2015-01-02T02:00:00.000Z"].map { |i| Time.iso8601(i) }
- actual = queue.size.times.map { queue.pop.get("custom_time").time }
- expect(actual).to eq(expected)
- plugin.stop
+ let(:msecs) { [111, 122, 233, 244, 355, 366, 477, 488, 599, 611, 722] }
- plugin.run(queue)
- expect(queue.size).to eq(0)
- db[:test_table].insert(:num => 11, :custom_time => "2015-01-01 11:00:00", :created_at => Time.now.utc)
- db[:test_table].insert(:num => 12, :custom_time => "2015-01-01 21:00:00", :created_at => Time.now.utc)
- plugin.run(queue)
- expect(queue.size).to eq(1)
- event = queue.pop
- expect(event.get("num")).to eq(12)
- expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T03:00:00.000Z"))
- p settings
+ it "should convert the time to reflect the timezone " do
+ # Sequel only does the *correct* timezone calc on a DateTime instance
+ last_run_value = DateTime.iso8601("2000-01-01T00:00:00.987Z")
+ File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
+
+ hours.each_with_index do |i, j|
+ db[:test1_table].insert(:num => i, :custom_time => "2015-01-01 #{i}:00:00.#{msecs[j]}", :created_at => Time.now.utc)
+ end
+
+ plugin.register
+
+ plugin.run(queue)
+ expected = hours.map.with_index {|hour, i| Time.iso8601("2015-01-01T06:00:00.000Z") + (hour * 3600 + (msecs[i] / 1000.0)) }
+ actual = queue.size.times.map { queue.pop.get("custom_time").time }
+ expect(actual).to eq(expected)
+ plugin.stop
+ last_run_value = YAML.load(File.read(settings["last_run_metadata_path"]))
+ expect(last_run_value).to be_a(DateTime)
+ expect(last_run_value.strftime("%F %T.%N %Z")).to eq("2015-01-02 02:00:00.722000000 +00:00")
+
+ plugin.run(queue)
+ plugin.stop
+ db[:test1_table].insert(:num => 11, :custom_time => "2015-01-01 11:00:00.099", :created_at => Time.now.utc)
+ db[:test1_table].insert(:num => 12, :custom_time => "2015-01-01 21:00:00.811", :created_at => Time.now.utc)
+ expect(queue.size).to eq(0)
+ plugin.run(queue)
+ expect(queue.size).to eq(1)
+ event = queue.pop
+ plugin.stop
+ expect(event.get("num")).to eq(12)
+ expect(event.get("custom_time").time).to eq(Time.iso8601("2015-01-02T03:00:00.811Z"))
+ last_run_value = YAML.load(File.read(settings["last_run_metadata_path"]))
+ expect(last_run_value).to be_a(DateTime)
+ # verify that sub-seconds are recorded to the file
+ expect(last_run_value.strftime("%F %T.%N %Z")).to eq("2015-01-02 03:00:00.811000000 +00:00")
+ end
end
end
context "when fetching time data without jdbc_default_timezone set" do
let(:mixin_settings) do
@@ -477,20 +562,20 @@
it "should successfully update sql_last_value" do
test_table = db[:test_table]
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(0)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(0)
test_table.insert(:num => nums[0], :created_at => Time.now.utc)
test_table.insert(:num => nums[1], :created_at => Time.now.utc)
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(20)
test_table.insert(:num => nums[2], :created_at => Time.now.utc)
test_table.insert(:num => nums[3], :created_at => Time.now.utc)
test_table.insert(:num => nums[4], :created_at => Time.now.utc)
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(50)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(50)
end
end
context "when iteratively running plugin#run with timestamp tracking column with column value" do
let(:mixin_settings) do
@@ -520,21 +605,21 @@
it "should successfully update sql_last_value" do
test_table = db[:test_table]
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.parse("1970-01-01 00:00:00.000000000 +0000"))
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse("1970-01-01 00:00:00.000000000 +0000"))
test_table.insert(:num => nums[0], :created_at => Time.now.utc, :custom_time => times[0])
test_table.insert(:num => nums[1], :created_at => Time.now.utc, :custom_time => times[1])
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value").class).to eq(Time.parse(times[0]).class)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.parse(times[1]))
+ expect(plugin.instance_variable_get("@value_tracker").value.class).to eq(Time.parse(times[0]).class)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse(times[1]))
test_table.insert(:num => nums[2], :created_at => Time.now.utc, :custom_time => times[2])
test_table.insert(:num => nums[3], :created_at => Time.now.utc, :custom_time => times[3])
test_table.insert(:num => nums[4], :created_at => Time.now.utc, :custom_time => times[4])
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.parse(times[4]))
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse(times[4]))
end
end
context "when iteratively running plugin#run with tracking_column and stored metadata" do
let(:mixin_settings) do
@@ -564,23 +649,23 @@
it "should successfully update sql_last_value and only add appropriate events" do
test_table = db[:test_table]
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(20)
expect(queue.length).to eq(0) # Shouldn't grab anything here.
test_table.insert(:num => nums[0], :created_at => Time.now.utc)
test_table.insert(:num => nums[1], :created_at => Time.now.utc)
plugin.run(queue)
expect(queue.length).to eq(0) # Shouldn't grab anything here either.
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(20)
test_table.insert(:num => nums[2], :created_at => Time.now.utc)
test_table.insert(:num => nums[3], :created_at => Time.now.utc)
test_table.insert(:num => nums[4], :created_at => Time.now.utc)
plugin.run(queue)
expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed.
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(50)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(50)
end
end
context "when iteratively running plugin#run with BAD tracking_column and stored metadata" do
let(:mixin_settings) do
@@ -610,23 +695,23 @@
it "should send a warning and not update sql_last_value" do
test_table = db[:test_table]
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(20)
expect(queue.length).to eq(0) # Shouldn't grab anything here.
test_table.insert(:num => nums[0], :created_at => Time.now.utc)
test_table.insert(:num => nums[1], :created_at => Time.now.utc)
plugin.run(queue)
expect(queue.length).to eq(0) # Shouldn't grab anything here either.
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(20)
test_table.insert(:num => nums[2], :created_at => Time.now.utc)
test_table.insert(:num => nums[3], :created_at => Time.now.utc)
test_table.insert(:num => nums[4], :created_at => Time.now.utc)
plugin.run(queue)
expect(queue.length).to eq(3) # Only values greater than 20 should be grabbed.
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(20)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(20)
expect(plugin.instance_variable_get("@tracking_column_warning_sent")).to eq(true)
end
end
context "when previous runs are to be respected upon successful query execution (by time)" do
@@ -648,11 +733,11 @@
end
it "should respect last run metadata" do
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to be > last_run_time
+ expect(plugin.instance_variable_get("@value_tracker").value).to be > last_run_time
end
end
context "when previous runs are to be respected upon successful query execution (by column)" do
@@ -675,11 +760,11 @@
end
it "metadata should equal last_run_value" do
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(last_run_value)
end
end
context "when previous runs are to be respected upon query failure (by time)" do
let(:settings) do
@@ -699,11 +784,11 @@
end
it "should not respect last run metadata" do
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_time)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(last_run_time)
end
end
context "when previous runs are to be respected upon query failure (by column)" do
let(:settings) do
@@ -726,11 +811,11 @@
end
it "metadata should still reflect last value" do
plugin.run(queue)
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(last_run_value)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(last_run_value)
end
end
context "when doing a clean run (by time)" do
@@ -752,11 +837,11 @@
after do
plugin.stop
end
it "should ignore last run metadata if :clean_run set to true" do
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(Time.at(0).utc)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.at(0).utc)
end
end
context "when doing a clean run (by value)" do
@@ -780,10 +865,10 @@
after do
plugin.stop
end
it "should ignore last run metadata if :clean_run set to true" do
- expect(plugin.instance_variable_get("@sql_last_value")).to eq(0)
+ expect(plugin.instance_variable_get("@value_tracker").value).to eq(0)
end
end
context "when state is not to be persisted" do