lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.1 vs lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.2

- old
+ new

@@ -2,10 +2,12 @@ # TAKEN FROM WIIBAA require "logstash/config/mixin" require "time" require "date" +java_import java.util.concurrent.locks.ReentrantLock + # Tentative of abstracting JDBC logic to a mixin # for potential reuse in other plugins (input/output) module LogStash::PluginMixins::Jdbc # This method is called when someone includes this module @@ -189,10 +191,11 @@ end end public def prepare_jdbc_connection + @connection_lock = ReentrantLock.new if @use_column_value case @tracking_column_type when "numeric" @sql_last_value = 0 when "timestamp" @@ -204,53 +207,61 @@ end # def prepare_jdbc_connection public def close_jdbc_connection begin + # pipeline restarts can also close the jdbc connection, block until the current executing statement is finished to avoid leaking connections + # connections in use won't really get closed + @connection_lock.lock @database.disconnect if @database rescue => e @logger.warn("Failed to close connection", :exception => e) + ensure + @connection_lock.unlock end end public def execute_statement(statement, parameters) - success = false - open_jdbc_connection - begin - parameters = symbolized_params(parameters) - query = @database[statement, parameters] - sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc - @tracking_column_warning_sent = false - @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) + success = false + @connection_lock.lock + open_jdbc_connection + begin + parameters = symbolized_params(parameters) + query = @database[statement, parameters] + sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc + @tracking_column_warning_sent = false + @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) - if @jdbc_paging_enabled - query.each_page(@jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| + if @jdbc_paging_enabled + query.each_page(@jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + sql_last_value = get_column_value(row) if @use_column_value + if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) + sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` + end + yield extract_values_from(row) + end + end + else + query.each do |row| sql_last_value = get_column_value(row) if @use_column_value if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` end yield extract_values_from(row) end end + success = true + rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e + @logger.warn("Exception when executing JDBC query", :exception => e) else - query.each do |row| - sql_last_value = get_column_value(row) if @use_column_value - if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) - sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` - end - yield extract_values_from(row) - end + @sql_last_value = sql_last_value + ensure + close_jdbc_connection + @connection_lock.unlock end - success = true - rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e - @logger.warn("Exception when executing JDBC query", :exception => e) - else - @sql_last_value = sql_last_value - end - close_jdbc_connection - return success + return success end public def get_column_value(row) if !row.has_key?(@tracking_column.to_sym)