lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.14 vs lib/logstash/plugin_mixins/jdbc/jdbc.rb in logstash-input-jdbc-4.3.16

- old
+ new

@@ -4,10 +4,11 @@ require "time" require "date" require_relative "value_tracking" require_relative "checked_count_logger" require_relative "wrapped_driver" +require_relative "statement_handler" java_import java.util.concurrent.locks.ReentrantLock # Tentative of abstracting JDBC logic to a mixin # for potential reuse in other plugins (input/output) @@ -169,26 +170,28 @@ require "java" require "sequel" require "sequel/adapters/jdbc" Sequel.application_timezone = @plugin_timezone.to_sym - - begin - load_drivers - Sequel::JDBC.load_driver(@jdbc_driver_class) - rescue LogStash::Error => e - # raised in load_drivers, e.cause should be the caught Java exceptions - raise LogStash::PluginLoadingError, "#{e.message} and #{e.cause.message}" - rescue Sequel::AdapterNotFound => e - # fix this !!! - message = if @jdbc_driver_library.nil? - ":jdbc_driver_library is not set, are you sure you included - the proper driver client libraries in your classpath?" - else - "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" + if @drivers_loaded.false? + begin + load_drivers + Sequel::JDBC.load_driver(@jdbc_driver_class) + rescue LogStash::Error => e + # raised in load_drivers, e.cause should be the caught Java exceptions + raise LogStash::PluginLoadingError, "#{e.message} and #{e.cause.message}" + rescue Sequel::AdapterNotFound => e + # fix this !!! + message = if @jdbc_driver_library.nil? + ":jdbc_driver_library is not set, are you sure you included + the proper driver client libraries in your classpath?" + else + "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" + end + raise LogStash::PluginLoadingError, "#{e}. #{message}" end - raise LogStash::PluginLoadingError, "#{e}. #{message}" + @drivers_loaded.make_true end @database = jdbc_connect() @database.extension(:pagination) if @jdbc_default_timezone @database.extension(:named_timezones) @@ -224,10 +227,11 @@ end public def prepare_jdbc_connection @connection_lock = ReentrantLock.new + @drivers_loaded = Concurrent::AtomicBoolean.new end public def close_jdbc_connection begin @@ -241,22 +245,18 @@ @connection_lock.unlock end end public - def execute_statement(statement, parameters) - # sql_last_value has been set in params by caller + def execute_statement success = false @connection_lock.lock open_jdbc_connection begin - params = symbolized_params(parameters) - query = @database[statement, params] sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc @tracking_column_warning_sent = false - @statement_logger.log_statement_parameters(query, statement, params) - perform_query(query) do |row| + @statement_handler.perform_query(@database, @value_tracker.value) do |row| sql_last_value = get_column_value(row) if @use_column_value yield extract_values_from(row) end success = true rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e @@ -268,58 +268,26 @@ @connection_lock.unlock end return success end -# Performs the query, respecting our pagination settings, yielding once per row of data -# @param query [Sequel::Dataset] -# @yieldparam row [Hash{Symbol=>Object}] - private - def perform_query(query) - if @jdbc_paging_enabled - query.each_page(@jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| - yield row - end - end - else - query.each do |row| - yield row - end - end - end - public def get_column_value(row) if !row.has_key?(@tracking_column.to_sym) if !@tracking_column_warning_sent @logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column) @tracking_column_warning_sent = true end - # If we can't find the tracking column, return the current value_tracker value + # If we can't find the tracking column, return the current value in the ivar @value_tracker.value else # Otherwise send the updated tracking column row[@tracking_column.to_sym] end end -# Symbolize parameters keys to use with Sequel private - def symbolized_params(parameters) - parameters.inject({}) do |hash,(k,v)| - case v - when LogStash::Timestamp - hash[k.to_sym] = v.time - else - hash[k.to_sym] = v - end - hash - end - end - - private -#Stringify row keys and decorate values when necessary + #Stringify row keys and decorate values when necessary def extract_values_from(row) Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }] end private