lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.3 vs lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.4
- old
+ new
@@ -101,11 +101,12 @@
private
def jdbc_connect
opts = {
:user => @jdbc_user,
:password => @jdbc_password.nil? ? nil : @jdbc_password.value,
- :pool_timeout => @jdbc_pool_timeout
+ :pool_timeout => @jdbc_pool_timeout,
+ :keep_reference => false
}.merge(@sequel_opts)
retry_attempts = @connection_retry_attempts
loop do
retry_attempts -= 1
begin
@@ -230,28 +231,16 @@
query = @database[statement, parameters]
sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc
@tracking_column_warning_sent = false
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)
- if @jdbc_paging_enabled
- query.each_page(@jdbc_page_size) do |paged_dataset|
- paged_dataset.each do |row|
- sql_last_value = get_column_value(row) if @use_column_value
- if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
- sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
- end
- yield extract_values_from(row)
- end
+ perform_query(query) do |row|
+ sql_last_value = get_column_value(row) if @use_column_value
+ if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
+ sql_last_value = sql_last_value.to_time # Coerce the timestamp to a `Time`
end
- else
- query.each do |row|
- sql_last_value = get_column_value(row) if @use_column_value
- if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
- sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
- end
- yield extract_values_from(row)
- end
+ yield extract_values_from(row)
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
@logger.warn("Exception when executing JDBC query", :exception => e)
else
@@ -259,9 +248,27 @@
ensure
close_jdbc_connection
@connection_lock.unlock
end
return success
+ end
+
+ # Performs the query, respecting our pagination settings, yielding once per row of data
+ # @param query [Sequel::Dataset]
+ # @yieldparam row [Hash{Symbol=>Object}]
+ private
+ def perform_query(query)
+ if @jdbc_paging_enabled
+ query.each_page(@jdbc_page_size) do |paged_dataset|
+ paged_dataset.each do |row|
+ yield row
+ end
+ end
+ else
+ query.each do |row|
+ yield row
+ end
+ end
end
public
def get_column_value(row)
if !row.has_key?(@tracking_column.to_sym)