lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-0.1.3 vs lib/logstash/plugin_mixins/jdbc.rb in logstash-input-jdbc-1.0.0
- old
+ new
@@ -11,59 +11,88 @@
# This method is called when someone includes this module
def self.included(base)
# Add these methods to the 'base' given.
base.extend(self)
base.setup_jdbc_config
-
- @sql_last_start = Time.at(0).utc
end
public
def setup_jdbc_config
# JDBC driver library path to third party driver library.
+ #
+ # If not provided, Plugin will look for the driver class in the Logstash Java classpath.
config :jdbc_driver_library, :validate => :path
# JDBC driver class to load, for example "oracle.jdbc.OracleDriver" or "org.apache.derby.jdbc.ClientDriver"
config :jdbc_driver_class, :validate => :string, :required => true
# JDBC connection string
config :jdbc_connection_string, :validate => :string, :required => true
# JDBC user
- config :jdbc_user, :validate => :string, :default => "tal"
+ config :jdbc_user, :validate => :string, :required => true
# JDBC password
config :jdbc_password, :validate => :password
+ # JDBC enable paging
+ #
+ # This will cause a sql statement to be broken up into multiple queries.
+ # Each query will use limits and offsets to collectively retrieve the full
+ # result-set. The limit size is set with `jdbc_page_size`.
+ #
+ # Be aware that ordering is not guaranteed between queries.
+ config :jdbc_paging_enabled, :validate => :boolean, :default => false
+
+ # JDBC page size
+ config :jdbc_page_size, :validate => :number, :default => 100000
+
+ # JDBC fetch size. if not provided, respective driver's default will be used
+ config :jdbc_fetch_size, :validate => :number
+
# Connection pool configuration.
# Validate connection before use.
config :jdbc_validate_connection, :validate => :boolean, :default => false
# Connection pool configuration.
# How often to validate a connection (in seconds)
- config :jdcb_validation_timeout, :validate => :number, :default => 3600
+ config :jdbc_validation_timeout, :validate => :number, :default => 3600
end
public
def prepare_jdbc_connection
require "java"
require "sequel"
require "sequel/adapters/jdbc"
require @jdbc_driver_library if @jdbc_driver_library
- Sequel::JDBC.load_driver(@jdbc_driver_class)
+ begin
+ Sequel::JDBC.load_driver(@jdbc_driver_class)
+ rescue Sequel::AdapterNotFound => e
+ message = if @jdbc_driver_library.nil?
+ ":jdbc_driver_library is not set, are you sure you included
+ the proper driver client libraries in your classpath?"
+ else
+ "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
+ end
+ raise LogStash::ConfigurationError, "#{e}. #{message}"
+ end
@database = Sequel.connect(@jdbc_connection_string, :user=> @jdbc_user, :password=> @jdbc_password.nil? ? nil : @jdbc_password.value)
+ @database.extension(:pagination)
if @jdbc_validate_connection
@database.extension(:connection_validator)
- @database.pool.connection_validation_timeout = @jdcb_validation_timeout
+ @database.pool.connection_validation_timeout = @jdbc_validation_timeout
end
+ @database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil?
begin
@database.test_connection
rescue Sequel::DatabaseConnectionError => e
#TODO return false and let the plugin raise a LogStash::ConfigurationError
raise e
end
+
+ @sql_last_start = Time.at(0).utc
end # def prepare_jdbc_connection
public
def close_jdbc_connection
@database.disconnect if @database
@@ -75,12 +104,22 @@
begin
parameters = symbolized_params(parameters)
query = @database[statement, parameters]
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters)
@sql_last_start = Time.now.utc
- query.all do |row|
- #Stringify row keys
- yield Hash[row.map { |k, v| [k.to_s, v] }]
+
+ if @jdbc_paging_enabled
+ query.each_page(@jdbc_page_size) do |paged_dataset|
+ paged_dataset.each do |row|
+ #Stringify row keys
+ yield Hash[row.map { |k, v| [k.to_s, v] }]
+ end
+ end
+ else
+ query.each do |row|
+ #Stringify row keys
+ yield Hash[row.map { |k, v| [k.to_s, v] }]
+ end
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
@logger.warn("Exception when executing JDBC query", :exception => e)
end