lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.0.2 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.1.0

- old
+ new

@@ -158,10 +158,30 @@ config :record_last_run, :validate => :boolean, :default => true # Whether to force the lowercasing of identifier fields config :lowercase_column_names, :validate => :boolean, :default => true + # The character encoding of all columns, leave empty if the columns are already properly UTF-8 + # encoded. Specific columns charsets using :columns_charset can override this setting. + config :charset, :validate => :string + + # The character encoding for specific columns. This option will override the `:charset` option + # for the specified columns. + # + # Example: + # [source,ruby] + # ---------------------------------- + # input { + # jdbc { + # ... + # columns_charset => { "column0" => "ISO-8859-1" } + # ... + # } + # } + # this will only convert column0 that has ISO-8859-1 as an original encoding. + config :columns_charset, :validate => :hash, :default => {} + public def register require "rufus/scheduler" prepare_jdbc_connection @@ -171,10 +191,12 @@ if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end end + @enable_encoding = !@charset.nil? || !@columns_charset.empty? + # load sql_last_value from file if exists if @clean_run && File.exist?(@last_run_metadata_path) File.delete(@last_run_metadata_path) elsif File.exist?(@last_run_metadata_path) @sql_last_value = YAML.load(File.read(@last_run_metadata_path)) @@ -189,10 +211,18 @@ if (@jdbc_password_filepath and @jdbc_password) raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.") end @jdbc_password = File.read(@jdbc_password_filepath).strip if @jdbc_password_filepath + + if enable_encoding? + @converters = {} + @columns_charset.each do |column_name, encoding| + @converters[encoding] = LogStash::Util::Charset.new(encoding) + end + @converters[@charset] = LogStash::Util::Charset.new(@charset) if @charset + end end # def register def run(queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @@ -218,10 +248,14 @@ def execute_query(queue) # update default parameters @parameters['sql_last_value'] = @sql_last_value execute_statement(@statement, @parameters) do |row| + if enable_encoding? + ## do the necessary conversions to string elements + row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }] + end event = LogStash::Event.new(row) decorate(event) queue << event end end @@ -230,6 +264,26 @@ if @record_last_run File.write(@last_run_metadata_path, YAML.dump(@sql_last_value)) end end + private + + def enable_encoding? + @enable_encoding + end + + # make sure the encoding is uniform over fields + def convert(column_name, value) + return value unless value.is_a?(String) + column_charset = @columns_charset[column_name] + if column_charset + converter = @converters[column_charset] + converter.convert(value) + elsif @charset + converter = @converters[@charset] + converter.convert(value) + else + value + end + end end # class LogStash::Inputs::Jdbc