lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.3 vs lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.4

- old
+ new

@@ -101,11 +101,12 @@ private def jdbc_connect opts = { :user => @jdbc_user, :password => @jdbc_password.nil? ? nil : @jdbc_password.value, - :pool_timeout => @jdbc_pool_timeout + :pool_timeout => @jdbc_pool_timeout, + :keep_reference => false }.merge(@sequel_opts) retry_attempts = @connection_retry_attempts loop do retry_attempts -= 1 begin @@ -230,28 +231,16 @@ query = @database[statement, parameters] sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc @tracking_column_warning_sent = false @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) - if @jdbc_paging_enabled - query.each_page(@jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| - sql_last_value = get_column_value(row) if @use_column_value - if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) - sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` - end - yield extract_values_from(row) - end + perform_query(query) do |row| + sql_last_value = get_column_value(row) if @use_column_value + if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) + sql_last_value = sql_last_value.to_time # Coerce the timestamp to a `Time` end - else - query.each do |row| - sql_last_value = get_column_value(row) if @use_column_value - if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) - sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` - end - yield extract_values_from(row) - end + yield extract_values_from(row) end success = true rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e @logger.warn("Exception when executing JDBC query", :exception => e) else @@ -259,9 +248,27 @@ ensure close_jdbc_connection @connection_lock.unlock end return success + end + + # Performs the query, respecting our pagination settings, yielding once per row of data + # @param query [Sequel::Dataset] + # @yieldparam row [Hash{Symbol=>Object}] + private + def perform_query(query) + if @jdbc_paging_enabled + query.each_page(@jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + yield row + end + end + else + query.each do |row| + yield row + end + end end public def get_column_value(row) if !row.has_key?(@tracking_column.to_sym)