lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.3.24 vs lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.3.25

- old
+ new

@@ -1,33 +1,33 @@ -# encoding: utf-8 +# frozen_string_literal: true + require 'logstash/outputs/base' require 'logstash/namespace' require 'concurrent' require 'stud/interval' require 'java' require 'logstash-output-charrington_jars' require 'json' require 'bigdecimal' -require File.join(File.dirname(__FILE__), "charrington/process") -require File.join(File.dirname(__FILE__), "charrington/transform_postgres") -require File.join(File.dirname(__FILE__), "charrington/transform_redshift") -require File.join(File.dirname(__FILE__), "charrington/insert") +require File.join(File.dirname(__FILE__), 'charrington/process') +require File.join(File.dirname(__FILE__), 'charrington/transform_postgres') +require File.join(File.dirname(__FILE__), 'charrington/transform_redshift') +require File.join(File.dirname(__FILE__), 'charrington/insert') # Write events to a SQL engine, using JDBC. # It is upto the user of the plugin to correctly configure the plugin. # This class is responsible for setting things up, creating the connection, # and handling retries. Charrington::Insert is where the insert # is attempted. If that fails, it will try to either # create a table via Charrington::CreateTable # or alter an existing one via Charrington::AlterTable - -class LogStash::Outputs::Charrington < LogStash::Outputs::Base +class LogStash::Outputs::Charrington < LogStash::Outputs::Base # rubocop:disable Metrics/ClassLength concurrency :shared config_name 'charrington' - STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze + STRFTIME_FMT = '%Y-%m-%d %T.%L' # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26 config :driver_class, validate: :string # Does the JDBC driver support autocommit? @@ -96,27 +96,27 @@ setup_and_test_pool! end def multi_receive(events) events.each do |event| - connection = get_connection - break unless connection + conn = connection + break unless conn schema = get_schema(event) - opts = { connection: connection, + opts = { connection: conn, schema: schema, max_retries: @max_flush_exceptions, retry_initial_interval: @retry_initial_interval, driver: driver, transformer: @transformer } - Charrington::Process.call(connection, event, opts) - rescue => e + Charrington::Process.call(conn, event, opts) + rescue StandardError => e @logger.error("Unable to process event. Event dropped. #{e.message}") next ensure - connection.close unless connection.nil? + conn&.close end end def close @stopping.make_true @@ -127,13 +127,13 @@ private def driver case @driver_class when /redshift/ - "redshift" + 'redshift' else - "postgres" + 'postgres' end end def setup_and_test_pool! @pool = Java::ComZaxxerHikari::HikariDataSource.new @@ -145,29 +145,28 @@ @pool.setAutoCommit(@driver_auto_commit) @pool.setJdbcUrl(@connection_string) validate_connection_timeout = (@connection_timeout / 1000) / 2 - if !@connection_test_query.nil? and @connection_test_query.length > 1 + if !@connection_test_query.nil? && (@connection_test_query.length > 1) @pool.setConnectionTestQuery(@connection_test_query) @pool.setConnectionInitSql(@connection_test_query) end return unless @connection_test # Test connection test_connection = @pool.getConnection - unless test_connection.isValid(validate_connection_timeout) - @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.') - end + @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.') unless test_connection.isValid(validate_connection_timeout) test_connection.close end # Load jar from driver path def load_jar_files! unless @driver_jar_path.nil? raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path + require @driver_jar_path return end # Revert original behaviour of loading from vendor directory if no path given @@ -190,42 +189,38 @@ def get_schema(event) if !@schema.nil? @schema else - case event.to_hash["app_name"] - when "Web App" - "dea_webapp" - when "Mobile App" - "dea_mobileapp" + case event.to_hash['app_name'] + when 'Web App' + 'dea_webapp' + when 'Mobile App' + 'dea_mobileapp' else - "" + '' end end end - def get_connection - connection = @pool.getConnection - rescue => e + def connection + @pool.getConnection + rescue StandardError => e log_jdbc_exception(e, true, nil) false end def log_jdbc_exception(exception, retrying, event) current_exception = exception - log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') + log_text = "JDBC - Exception. #{retrying ? 'Retrying' : 'Not retrying'}" log_method = (retrying ? 'warn' : 'error') loop do - @logger.send(log_method, log_text, :exception => current_exception, :event => event) + @logger.send(log_method, log_text, exception: current_exception, event: event) - if current_exception.respond_to? 'getNextException' - current_exception = current_exception.getNextException() - else - current_exception = nil - end + current_exception = (current_exception.getNextException if current_exception.respond_to? 'getNextException') - break if current_exception == nil + break if current_exception.nil? end end end