lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.14 vs lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.16
- old
+ new
@@ -4,10 +4,11 @@
require "time"
require "date"
require_relative "value_tracking"
require_relative "checked_count_logger"
require_relative "wrapped_driver"
+require_relative "statement_handler"
java_import java.util.concurrent.locks.ReentrantLock
# Tentative of abstracting JDBC logic to a mixin
# for potential reuse in other plugins (input/output)
@@ -169,26 +170,28 @@
require "java"
require "sequel"
require "sequel/adapters/jdbc"
Sequel.application_timezone = @plugin_timezone.to_sym
-
- begin
- load_drivers
- Sequel::JDBC.load_driver(@jdbc_driver_class)
- rescue LogStash::Error => e
- # raised in load_drivers, e.cause should be the caught Java exceptions
- raise LogStash::PluginLoadingError, "#{e.message} and #{e.cause.message}"
- rescue Sequel::AdapterNotFound => e
- # fix this !!!
- message = if @jdbc_driver_library.nil?
- ":jdbc_driver_library is not set, are you sure you included
- the proper driver client libraries in your classpath?"
- else
- "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
+ if @drivers_loaded.false?
+ begin
+ load_drivers
+ Sequel::JDBC.load_driver(@jdbc_driver_class)
+ rescue LogStash::Error => e
+ # raised in load_drivers, e.cause should be the caught Java exceptions
+ raise LogStash::PluginLoadingError, "#{e.message} and #{e.cause.message}"
+ rescue Sequel::AdapterNotFound => e
+ # fix this !!!
+ message = if @jdbc_driver_library.nil?
+ ":jdbc_driver_library is not set, are you sure you included
+ the proper driver client libraries in your classpath?"
+ else
+ "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
+ end
+ raise LogStash::PluginLoadingError, "#{e}. #{message}"
end
- raise LogStash::PluginLoadingError, "#{e}. #{message}"
+ @drivers_loaded.make_true
end
@database = jdbc_connect()
@database.extension(:pagination)
if @jdbc_default_timezone
@database.extension(:named_timezones)
@@ -224,10 +227,11 @@
end
public
def prepare_jdbc_connection
@connection_lock = ReentrantLock.new
+ @drivers_loaded = Concurrent::AtomicBoolean.new
end
public
def close_jdbc_connection
begin
@@ -241,22 +245,18 @@
@connection_lock.unlock
end
end
public
- def execute_statement(statement, parameters)
- # sql_last_value has been set in params by caller
+ def execute_statement
success = false
@connection_lock.lock
open_jdbc_connection
begin
- params = symbolized_params(parameters)
- query = @database[statement, params]
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
@tracking_column_warning_sent = false
- @statement_logger.log_statement_parameters(query, statement, params)
- perform_query(query) do |row|
+ @statement_handler.perform_query(@database, @value_tracker.value) do |row|
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e
@@ -268,58 +268,26 @@
@connection_lock.unlock
end
return success
end
-# Performs the query, respecting our pagination settings, yielding once per row of data
-# @param query [Sequel::Dataset]
-# @yieldparam row [Hash{Symbol=>Object}]
- private
- def perform_query(query)
- if @jdbc_paging_enabled
- query.each_page(@jdbc_page_size) do |paged_dataset|
- paged_dataset.each do |row|
- yield row
- end
- end
- else
- query.each do |row|
- yield row
- end
- end
- end
-
public
def get_column_value(row)
if !row.has_key?(@tracking_column.to_sym)
if !@tracking_column_warning_sent
@logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column)
@tracking_column_warning_sent = true
end
- # If we can't find the tracking column, return the current value_tracker value
+ # If we can't find the tracking column, return the current value in the ivar
@value_tracker.value
else
# Otherwise send the updated tracking column
row[@tracking_column.to_sym]
end
end
-# Symbolize parameters keys to use with Sequel
private
- def symbolized_params(parameters)
- parameters.inject({}) do |hash,(k,v)|
- case v
- when LogStash::Timestamp
- hash[k.to_sym] = v.time
- else
- hash[k.to_sym] = v
- end
- hash
- end
- end
-
- private
-#Stringify row keys and decorate values when necessary
+ #Stringify row keys and decorate values when necessary
def extract_values_from(row)
Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }]
end
private