lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.1 vs lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-4.3.2
- old
+ new
@@ -2,10 +2,12 @@
# TAKEN FROM WIIBAA
require "logstash/config/mixin"
require "time"
require "date"
+java_import java.util.concurrent.locks.ReentrantLock
+
# Tentative of abstracting JDBC logic to a mixin
# for potential reuse in other plugins (input/output)
module LogStash::PluginMixins::Jdbc
# This method is called when someone includes this module
@@ -189,10 +191,11 @@
end
end
public
def prepare_jdbc_connection
+ @connection_lock = ReentrantLock.new
if @use_column_value
case @tracking_column_type
when "numeric"
@sql_last_value = 0
when "timestamp"
@@ -204,53 +207,61 @@
end # def prepare_jdbc_connection
public
def close_jdbc_connection
begin
+ # pipeline restarts can also close the jdbc connection, block until the current executing statement is finished to avoid leaking connections
+ # connections in use won't really get closed
+ @connection_lock.lock
@database.disconnect if @database
rescue => e
@logger.warn("Failed to close connection", :exception => e)
+ ensure
+ @connection_lock.unlock
end
end
public
def execute_statement(statement, parameters)
- success = false
- open_jdbc_connection
- begin
- parameters = symbolized_params(parameters)
- query = @database[statement, parameters]
- sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc
- @tracking_column_warning_sent = false
- @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)
+ success = false
+ @connection_lock.lock
+ open_jdbc_connection
+ begin
+ parameters = symbolized_params(parameters)
+ query = @database[statement, parameters]
+ sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc
+ @tracking_column_warning_sent = false
+ @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)
- if @jdbc_paging_enabled
- query.each_page(@jdbc_page_size) do |paged_dataset|
- paged_dataset.each do |row|
+ if @jdbc_paging_enabled
+ query.each_page(@jdbc_page_size) do |paged_dataset|
+ paged_dataset.each do |row|
+ sql_last_value = get_column_value(row) if @use_column_value
+ if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
+ sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
+ end
+ yield extract_values_from(row)
+ end
+ end
+ else
+ query.each do |row|
sql_last_value = get_column_value(row) if @use_column_value
if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
end
yield extract_values_from(row)
end
end
+ success = true
+ rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
+ @logger.warn("Exception when executing JDBC query", :exception => e)
else
- query.each do |row|
- sql_last_value = get_column_value(row) if @use_column_value
- if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
- sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
- end
- yield extract_values_from(row)
- end
+ @sql_last_value = sql_last_value
+ ensure
+ close_jdbc_connection
+ @connection_lock.unlock
end
- success = true
- rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
- @logger.warn("Exception when executing JDBC query", :exception => e)
- else
- @sql_last_value = sql_last_value
- end
- close_jdbc_connection
- return success
+ return success
end
public
def get_column_value(row)
if !row.has_key?(@tracking_column.to_sym)