lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.13 vs lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.14
- old
+ new
@@ -3,10 +3,11 @@
require "logstash/config/mixin"
require "time"
require "date"
require_relative "value_tracking"
require_relative "checked_count_logger"
+require_relative "wrapped_driver"
java_import java.util.concurrent.locks.ReentrantLock
# Tentative of abstracting JDBC logic to a mixin
# for potential reuse in other plugins (input/output)
@@ -96,10 +97,13 @@
# Maximum number of times to try connecting to database
config :connection_retry_attempts, :validate => :number, :default => 1
# Number of seconds to sleep between connection attempts
config :connection_retry_attempts_wait_time, :validate => :number, :default => 0.5
+
+ # give users the ability to force Sequel application side into using local timezone
+ config :plugin_timezone, :validate => ["local", "utc"], :default => "utc"
end
private
def jdbc_connect
opts = {
@@ -118,11 +122,12 @@
@logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.")
raise e
else
@logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Trying again.")
end
- rescue Sequel::Error => e
+ # rescue Java::JavaSql::SQLException, ::Sequel::Error => e
+ rescue ::Sequel::Error => e
if retry_attempts <= 0
@logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message, )
raise e
else
@logger.error("Unable to connect to database. Trying again", :error_message => e.message)
@@ -131,38 +136,59 @@
sleep(@connection_retry_attempts_wait_time)
end
end
private
- def load_drivers(drivers)
- drivers.each do |driver|
- begin
- class_loader = java.lang.ClassLoader.getSystemClassLoader().to_java(java.net.URLClassLoader)
- class_loader.add_url(java.io.File.new(driver).toURI().toURL())
- rescue => e
- @logger.error("Failed to load #{driver}", :exception => e)
- end
+
+ def load_drivers
+ return if @jdbc_driver_library.nil? || @jdbc_driver_library.empty?
+
+ driver_jars = @jdbc_driver_library.split(",")
+
+ # Needed for JDK 11 as the DriverManager has a different ClassLoader than Logstash
+ urls = java.net.URL[driver_jars.length].new
+
+ driver_jars.each_with_index do |driver, idx|
+ urls[idx] = java.io.File.new(driver).toURI().toURL()
end
+ ucl = java.net.URLClassLoader.new_instance(urls)
+ begin
+ klass = java.lang.Class.forName(@jdbc_driver_class.to_java(:string), true, ucl);
+ rescue Java::JavaLang::ClassNotFoundException => e
+ raise LogStash::Error, "Unable to find driver class via URLClassLoader in given driver jars: #{@jdbc_driver_class}"
+ end
+ begin
+ driver = klass.getConstructor().newInstance();
+ java.sql.DriverManager.register_driver(WrappedDriver.new(driver.to_java(java.sql.Driver)).to_java(java.sql.Driver))
+ rescue Java::JavaSql::SQLException => e
+ raise LogStash::Error, "Unable to register driver with java.sql.DriverManager using WrappedDriver: #{@jdbc_driver_class}"
+ end
end
private
def open_jdbc_connection
require "java"
require "sequel"
require "sequel/adapters/jdbc"
- load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library
+ Sequel.application_timezone = @plugin_timezone.to_sym
+
begin
+ load_drivers
Sequel::JDBC.load_driver(@jdbc_driver_class)
+ rescue LogStash::Error => e
+ # raised in load_drivers, e.cause should be the caught Java exceptions
+ raise LogStash::PluginLoadingError, "#{e.message} and #{e.cause.message}"
rescue Sequel::AdapterNotFound => e
+ # fix this !!!
message = if @jdbc_driver_library.nil?
":jdbc_driver_library is not set, are you sure you included
the proper driver client libraries in your classpath?"
else
"Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
end
- raise LogStash::ConfigurationError, "#{e}. #{message}"
+ raise LogStash::PluginLoadingError, "#{e}. #{message}"
end
@database = jdbc_connect()
@database.extension(:pagination)
if @jdbc_default_timezone
@database.extension(:named_timezones)
@@ -173,10 +199,12 @@
@database.pool.connection_validation_timeout = @jdbc_validation_timeout
end
@database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil?
begin
@database.test_connection
+ rescue Java::JavaSql::SQLException => e
+ @logger.warn("Failed test_connection with java.sql.SQLException.", :exception => e)
rescue Sequel::DatabaseConnectionError => e
@logger.warn("Failed test_connection.", :exception => e)
close_jdbc_connection
#TODO return false and let the plugin raise a LogStash::ConfigurationError
@@ -214,26 +242,26 @@
end
end
public
def execute_statement(statement, parameters)
+ # sql_last_value has been set in params by caller
success = false
@connection_lock.lock
open_jdbc_connection
begin
params = symbolized_params(parameters)
query = @database[statement, params]
-
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
@tracking_column_warning_sent = false
@statement_logger.log_statement_parameters(query, statement, params)
perform_query(query) do |row|
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
end
success = true
- rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
+ rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e
@logger.warn("Exception when executing JDBC query", :exception => e)
else
@value_tracker.set_value(sql_last_value)
ensure
close_jdbc_connection
@@ -265,12 +293,12 @@
if !row.has_key?(@tracking_column.to_sym)
if !@tracking_column_warning_sent
@logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column)
@tracking_column_warning_sent = true
end
- # If we can't find the tracking column, return the current value in the ivar
- @sql_last_value
+ # If we can't find the tracking column, return the current value_tracker value
+ @value_tracker.value
else
# Otherwise send the updated tracking column
row[@tracking_column.to_sym]
end
end
@@ -295,23 +323,19 @@
Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }]
end
private
def decorate_value(value)
- if value.is_a?(Time)
+ case value
+ when Time
# transform it to LogStash::Timestamp as required by LS
LogStash::Timestamp.new(value)
- elsif value.is_a?(Date)
+ when Date, DateTime
LogStash::Timestamp.new(value.to_time)
- elsif value.is_a?(DateTime)
- # Manual timezone conversion detected.
- # This is slower, so we put it in as a conditional case.
- LogStash::Timestamp.new(Time.parse(value.to_s))
else
value
end
end
-
end
end end end