spec/inputs/jdbc_spec.rb in logstash-input-jdbc-0.1.3 vs spec/inputs/jdbc_spec.rb in logstash-input-jdbc-1.0.0

- old
+ new

@@ -1,84 +1,304 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/jdbc" require "jdbc/derby" +require "sequel" +require "sequel/adapters/jdbc" require "timecop" +require "stud/temporary" describe "jdbc" do - let(:mixin_settings) { {"jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"} } + let(:mixin_settings) { {"jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"} } + let(:settings) { {} } + let(:plugin) { LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) } + let(:queue) { Queue.new } + let (:db) do + Sequel.connect(mixin_settings['jdbc_connection_string'], :user=> nil, :password=> nil) + end before :each do Jdbc::Derby.load_driver + db.create_table :test_table do + DateTime :created_at + Integer :num + end end - it "should register and tear down" do - settings = {"statement" => "SELECT 1 as col1 FROM SYSIBM.SYSDUMMY1"} - plugin = LogStash::Plugin.lookup("input", "jdbc").new(mixin_settings.merge(settings)) - expect { plugin.register }.to_not raise_error - expect { plugin.teardown }.to_not raise_error + after :each do + db.drop_table(:test_table) end - it "should retrieve params correctly from Event" do - settings = {"statement" => "SELECT :num_param as num_param FROM SYSIBM.SYSDUMMY1", "parameters" => {"num_param" => 10} } - plugin = LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) - plugin.register - q = Queue.new - plugin.run(q) - insist { q.size } == 1 - insist { q.pop['num_param'] } == settings['parameters']['num_param'] - plugin.teardown + context "when registering and tearing down" do + let(:settings) { {"statement" => "SELECT 1 as col1 FROM test_table"} } + + it "should register without raising exception" do + expect { plugin.register }.to_not raise_error + plugin.teardown + end + + it "should teardown without raising exception" do + plugin.register + expect { plugin.teardown }.to_not raise_error + end end - it "should properly schedule" do - settings = {"statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1", "schedule" => "* * * * *"} - plugin = LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) - plugin.register - q = Queue.new - Timecop.travel(Time.new(2000)) - Timecop.scale(60) - runner = Thread.new do - plugin.run(q) + context "when neither statement and statement_filepath arguments are passed" do + it "should fail to register" do + expect{ plugin.register }.to raise_error(LogStash::ConfigurationError) end - sleep 3 - plugin.teardown - runner.kill - runner.join - insist { q.size } == 2 - Timecop.return end - it "should successfully iterate table with respect to field values" do - require "sequel" - require "sequel/adapters/jdbc" - Jdbc::Derby.load_driver - @database = Sequel.connect(mixin_settings['jdbc_connection_string'], :user=> nil, :password=> nil) - @database.create_table :test_table do - DateTime :created_at - Integer :num + context "when both statement and statement_filepath arguments are passed" do + let(:statement) { "SELECT * from test_table" } + let(:statement_file_path) { Stud::Temporary.pathname } + let(:settings) { { "statement_filepath" => statement_file_path, "statement" => statement } } + + it "should fail to register" do + expect{ plugin.register }.to raise_error(LogStash::ConfigurationError) end - test_table = @database[:test_table] - settings = {"statement" => "SELECT num, created_at FROM test_table WHERE created_at > :sql_last_start"} - plugin = LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) - plugin.register - q = Queue.new + end - nums = [10, 20, 30, 40, 50] - plugin.run(q) - test_table.insert(:num => nums[0], :created_at => Time.now.utc) - test_table.insert(:num => nums[1], :created_at => Time.now.utc) - plugin.run(q) - test_table.insert(:num => nums[2], :created_at => Time.now.utc) - test_table.insert(:num => nums[3], :created_at => Time.now.utc) - test_table.insert(:num => nums[4], :created_at => Time.now.utc) - plugin.run(q) + context "when statement is passed in from a file" do + let(:statement) { "SELECT * from test_table" } + let(:statement_file_path) { Stud::Temporary.pathname } + let(:settings) { { "statement_filepath" => statement_file_path } } - actual_sum = 0 - until q.empty? do - actual_sum += q.pop['num'] + before do + File.write(statement_file_path, statement) + plugin.register end - plugin.teardown + after do + plugin.teardown + end - insist { actual_sum } == nums.inject{|sum,x| sum + x } + it "should read in statement from file" do + expect(plugin.statement).to eq(statement) + end + end + + context "when passing parameters" do + let(:settings) do + { + "statement" => "SELECT :num_param as num_param FROM SYSIBM.SYSDUMMY1", + "parameters" => {"num_param" => 10} + } + end + + before do + plugin.register + end + + after do + plugin.teardown + end + + it "should retrieve params correctly from Event" do + plugin.run(queue) + expect(queue.pop['num_param']).to eq(settings['parameters']['num_param']) + end + end + + context "when scheduling" do + let(:settings) { {"statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1", "schedule" => "* * * * * UTC"} } + + before do + plugin.register + end + + it "should properly schedule" do + Timecop.travel(Time.new(2000)) + Timecop.scale(60) + runner = Thread.new do + plugin.run(queue) + end + sleep 3 + plugin.teardown + runner.kill + runner.join + expect(queue.size).to eq(2) + Timecop.return + end + + end + + context "when iterating result-set via paging" do + + let(:settings) do + { + "statement" => "SELECT * from test_table", + "jdbc_paging_enabled" => true, + "jdbc_page_size" => 20 + } + end + + let(:num_rows) { 1000 } + + before do + plugin.register + end + + after do + plugin.teardown + end + + it "should fetch all rows" do + num_rows.times do + db[:test_table].insert(:num => 1, :created_at => Time.now.utc) + end + + plugin.run(queue) + + expect(queue.size).to eq(num_rows) + end + + end + + context "when iteratively running plugin#run" do + let(:settings) do + {"statement" => "SELECT num, created_at FROM test_table WHERE created_at > :sql_last_start"} + end + + let(:nums) { [10, 20, 30, 40, 50] } + + before do + plugin.register + end + + after do + plugin.teardown + end + + it "should successfully iterate table with respect to field values" do + test_table = db[:test_table] + + plugin.run(queue) + test_table.insert(:num => nums[0], :created_at => Time.now.utc) + test_table.insert(:num => nums[1], :created_at => Time.now.utc) + plugin.run(queue) + test_table.insert(:num => nums[2], :created_at => Time.now.utc) + test_table.insert(:num => nums[3], :created_at => Time.now.utc) + test_table.insert(:num => nums[4], :created_at => Time.now.utc) + plugin.run(queue) + + actual_sum = 0 + until queue.empty? do + actual_sum += queue.pop['num'] + end + + expect(actual_sum).to eq(nums.inject{|sum,x| sum + x }) + end + end + + context "when previous runs are to be respected" do + + let(:settings) do + { "statement" => "SELECT * FROM test_table", + "last_run_metadata_path" => Stud::Temporary.pathname } + end + + let(:last_run_time) { Time.at(1).utc } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_time)) + plugin.register + end + + after do + plugin.teardown + end + + it "should respect last run metadata" do + expect(plugin.instance_variable_get("@sql_last_start")).to eq(last_run_time) + end + end + + context "when doing a clean run" do + + let(:settings) do + { + "statement" => "SELECT * FROM test_table", + "last_run_metadata_path" => Stud::Temporary.pathname, + "clean_run" => true + } + end + + let(:last_run_time) { Time.at(1).utc } + + before do + File.write(settings["last_run_metadata_path"], YAML.dump(last_run_time)) + plugin.register + end + + after do + plugin.teardown + end + + it "should ignore last run metadata if :clean_run set to true" do + expect(plugin.instance_variable_get("@sql_last_start")).to eq(Time.at(0).utc) + end + end + + context "when state is not to be persisted" do + let(:settings) do + { + "statement" => "SELECT * FROM test_table", + "last_run_metadata_path" => Stud::Temporary.pathname, + "record_last_run" => false + } + end + + before do + plugin.register + end + + after do + plugin.teardown + end + + it "should not save state if :record_last_run is false" do + expect(File).not_to exist(settings["last_run_metadata_path"]) + end + end + + context "when setting fetch size" do + + let(:settings) do + { + "statement" => "SELECT * from test_table", + "jdbc_fetch_size" => 1 + } + end + + let(:num_rows) { 10 } + + before do + num_rows.times do + db[:test_table].insert(:num => 1, :created_at => Time.now.utc) + end + + plugin.register + end + + after do + plugin.teardown + end + + it "should fetch all rows" do + plugin.run(queue) + expect(queue.size).to eq(num_rows) + end + end + + context "when driver is not found" do + let(:settings) { { "statement" => "SELECT * FROM test_table" } } + + before do + mixin_settings['jdbc_driver_class'] = "org.not.ExistsDriver" + end + + it "should fail" do + expect { plugin.register }.to raise_error(LogStash::ConfigurationError) + end end end