lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.0.2 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.1.0
- old
+ new
@@ -158,10 +158,30 @@
config :record_last_run, :validate => :boolean, :default => true
# Whether to force the lowercasing of identifier fields
config :lowercase_column_names, :validate => :boolean, :default => true
+ # The character encoding of all columns, leave empty if the columns are already properly UTF-8
+ # encoded. Specific columns charsets using :columns_charset can override this setting.
+ config :charset, :validate => :string
+
+ # The character encoding for specific columns. This option will override the `:charset` option
+ # for the specified columns.
+ #
+ # Example:
+ # [source,ruby]
+ # ----------------------------------
+ # input {
+ # jdbc {
+ # ...
+ # columns_charset => { "column0" => "ISO-8859-1" }
+ # ...
+ # }
+ # }
+ # this will only convert column0 that has ISO-8859-1 as an original encoding.
+ config :columns_charset, :validate => :hash, :default => {}
+
public
def register
require "rufus/scheduler"
prepare_jdbc_connection
@@ -171,10 +191,12 @@
if @tracking_column.nil?
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
end
end
+ @enable_encoding = !@charset.nil? || !@columns_charset.empty?
+
# load sql_last_value from file if exists
if @clean_run && File.exist?(@last_run_metadata_path)
File.delete(@last_run_metadata_path)
elsif File.exist?(@last_run_metadata_path)
@sql_last_value = YAML.load(File.read(@last_run_metadata_path))
@@ -189,10 +211,18 @@
if (@jdbc_password_filepath and @jdbc_password)
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
end
@jdbc_password = File.read(@jdbc_password_filepath).strip if @jdbc_password_filepath
+
+ if enable_encoding?
+ @converters = {}
+ @columns_charset.each do |column_name, encoding|
+ @converters[encoding] = LogStash::Util::Charset.new(encoding)
+ end
+ @converters[@charset] = LogStash::Util::Charset.new(@charset) if @charset
+ end
end # def register
def run(queue)
if @schedule
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@@ -218,10 +248,14 @@
def execute_query(queue)
# update default parameters
@parameters['sql_last_value'] = @sql_last_value
execute_statement(@statement, @parameters) do |row|
+ if enable_encoding?
+ ## do the necessary conversions to string elements
+ row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]
+ end
event = LogStash::Event.new(row)
decorate(event)
queue << event
end
end
@@ -230,6 +264,26 @@
if @record_last_run
File.write(@last_run_metadata_path, YAML.dump(@sql_last_value))
end
end
+ private
+
+ def enable_encoding?
+ @enable_encoding
+ end
+
+ # make sure the encoding is uniform over fields
+ def convert(column_name, value)
+ return value unless value.is_a?(String)
+ column_charset = @columns_charset[column_name]
+ if column_charset
+ converter = @converters[column_charset]
+ converter.convert(value)
+ elsif @charset
+ converter = @converters[@charset]
+ converter.convert(value)
+ else
+ value
+ end
+ end
end # class LogStash::Inputs::Jdbc