lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.13 vs lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.14

- old
+ new

@@ -3,10 +3,11 @@ require "logstash/config/mixin" require "time" require "date" require_relative "value_tracking" require_relative "checked_count_logger" +require_relative "wrapped_driver" java_import java.util.concurrent.locks.ReentrantLock # Tentative of abstracting JDBC logic to a mixin # for potential reuse in other plugins (input/output) @@ -96,10 +97,13 @@ # Maximum number of times to try connecting to database config :connection_retry_attempts, :validate => :number, :default => 1 # Number of seconds to sleep between connection attempts config :connection_retry_attempts_wait_time, :validate => :number, :default => 0.5 + + # give users the ability to force Sequel application side into using local timezone + config :plugin_timezone, :validate => ["local", "utc"], :default => "utc" end private def jdbc_connect opts = { @@ -118,11 +122,12 @@ @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.") raise e else @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Trying again.") end - rescue Sequel::Error => e + # rescue Java::JavaSql::SQLException, ::Sequel::Error => e + rescue ::Sequel::Error => e if retry_attempts <= 0 @logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message, ) raise e else @logger.error("Unable to connect to database. Trying again", :error_message => e.message) @@ -131,38 +136,59 @@ sleep(@connection_retry_attempts_wait_time) end end private - def load_drivers(drivers) - drivers.each do |driver| - begin - class_loader = java.lang.ClassLoader.getSystemClassLoader().to_java(java.net.URLClassLoader) - class_loader.add_url(java.io.File.new(driver).toURI().toURL()) - rescue => e - @logger.error("Failed to load #{driver}", :exception => e) - end + + def load_drivers + return if @jdbc_driver_library.nil? || @jdbc_driver_library.empty? + + driver_jars = @jdbc_driver_library.split(",") + + # Needed for JDK 11 as the DriverManager has a different ClassLoader than Logstash + urls = java.net.URL[driver_jars.length].new + + driver_jars.each_with_index do |driver, idx| + urls[idx] = java.io.File.new(driver).toURI().toURL() end + ucl = java.net.URLClassLoader.new_instance(urls) + begin + klass = java.lang.Class.forName(@jdbc_driver_class.to_java(:string), true, ucl); + rescue Java::JavaLang::ClassNotFoundException => e + raise LogStash::Error, "Unable to find driver class via URLClassLoader in given driver jars: #{@jdbc_driver_class}" + end + begin + driver = klass.getConstructor().newInstance(); + java.sql.DriverManager.register_driver(WrappedDriver.new(driver.to_java(java.sql.Driver)).to_java(java.sql.Driver)) + rescue Java::JavaSql::SQLException => e + raise LogStash::Error, "Unable to register driver with java.sql.DriverManager using WrappedDriver: #{@jdbc_driver_class}" + end end private def open_jdbc_connection require "java" require "sequel" require "sequel/adapters/jdbc" - load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library + Sequel.application_timezone = @plugin_timezone.to_sym + begin + load_drivers Sequel::JDBC.load_driver(@jdbc_driver_class) + rescue LogStash::Error => e + # raised in load_drivers, e.cause should be the caught Java exceptions + raise LogStash::PluginLoadingError, "#{e.message} and #{e.cause.message}" rescue Sequel::AdapterNotFound => e + # fix this !!! message = if @jdbc_driver_library.nil? ":jdbc_driver_library is not set, are you sure you included the proper driver client libraries in your classpath?" else "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" end - raise LogStash::ConfigurationError, "#{e}. #{message}" + raise LogStash::PluginLoadingError, "#{e}. #{message}" end @database = jdbc_connect() @database.extension(:pagination) if @jdbc_default_timezone @database.extension(:named_timezones) @@ -173,10 +199,12 @@ @database.pool.connection_validation_timeout = @jdbc_validation_timeout end @database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil? begin @database.test_connection + rescue Java::JavaSql::SQLException => e + @logger.warn("Failed test_connection with java.sql.SQLException.", :exception => e) rescue Sequel::DatabaseConnectionError => e @logger.warn("Failed test_connection.", :exception => e) close_jdbc_connection #TODO return false and let the plugin raise a LogStash::ConfigurationError @@ -214,26 +242,26 @@ end end public def execute_statement(statement, parameters) + # sql_last_value has been set in params by caller success = false @connection_lock.lock open_jdbc_connection begin params = symbolized_params(parameters) query = @database[statement, params] - sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc @tracking_column_warning_sent = false @statement_logger.log_statement_parameters(query, statement, params) perform_query(query) do |row| sql_last_value = get_column_value(row) if @use_column_value yield extract_values_from(row) end success = true - rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e + rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e @logger.warn("Exception when executing JDBC query", :exception => e) else @value_tracker.set_value(sql_last_value) ensure close_jdbc_connection @@ -265,12 +293,12 @@ if !row.has_key?(@tracking_column.to_sym) if !@tracking_column_warning_sent @logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column) @tracking_column_warning_sent = true end - # If we can't find the tracking column, return the current value in the ivar - @sql_last_value + # If we can't find the tracking column, return the current value_tracker value + @value_tracker.value else # Otherwise send the updated tracking column row[@tracking_column.to_sym] end end @@ -295,23 +323,19 @@ Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }] end private def decorate_value(value) - if value.is_a?(Time) + case value + when Time # transform it to LogStash::Timestamp as required by LS LogStash::Timestamp.new(value) - elsif value.is_a?(Date) + when Date, DateTime LogStash::Timestamp.new(value.to_time) - elsif value.is_a?(DateTime) - # Manual timezone conversion detected. - # This is slower, so we put it in as a conditional case. - LogStash::Timestamp.new(Time.parse(value.to_s)) else value end end - end end end end