lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.3.24 vs lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.3.25
- old
+ new
@@ -1,33 +1,33 @@
-# encoding: utf-8
+# frozen_string_literal: true
+
require 'logstash/outputs/base'
require 'logstash/namespace'
require 'concurrent'
require 'stud/interval'
require 'java'
require 'logstash-output-charrington_jars'
require 'json'
require 'bigdecimal'
-require File.join(File.dirname(__FILE__), "charrington/process")
-require File.join(File.dirname(__FILE__), "charrington/transform_postgres")
-require File.join(File.dirname(__FILE__), "charrington/transform_redshift")
-require File.join(File.dirname(__FILE__), "charrington/insert")
+require File.join(File.dirname(__FILE__), 'charrington/process')
+require File.join(File.dirname(__FILE__), 'charrington/transform_postgres')
+require File.join(File.dirname(__FILE__), 'charrington/transform_redshift')
+require File.join(File.dirname(__FILE__), 'charrington/insert')
# Write events to a SQL engine, using JDBC.
# It is upto the user of the plugin to correctly configure the plugin.
# This class is responsible for setting things up, creating the connection,
# and handling retries. Charrington::Insert is where the insert
# is attempted. If that fails, it will try to either
# create a table via Charrington::CreateTable
# or alter an existing one via Charrington::AlterTable
-
-class LogStash::Outputs::Charrington < LogStash::Outputs::Base
+class LogStash::Outputs::Charrington < LogStash::Outputs::Base # rubocop:disable Metrics/ClassLength
concurrency :shared
config_name 'charrington'
- STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
+ STRFTIME_FMT = '%Y-%m-%d %T.%L'
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
config :driver_class, validate: :string
# Does the JDBC driver support autocommit?
@@ -96,27 +96,27 @@
setup_and_test_pool!
end
def multi_receive(events)
events.each do |event|
- connection = get_connection
- break unless connection
+ conn = connection
+ break unless conn
schema = get_schema(event)
- opts = { connection: connection,
+ opts = { connection: conn,
schema: schema,
max_retries: @max_flush_exceptions,
retry_initial_interval: @retry_initial_interval,
driver: driver,
transformer: @transformer }
- Charrington::Process.call(connection, event, opts)
- rescue => e
+ Charrington::Process.call(conn, event, opts)
+ rescue StandardError => e
@logger.error("Unable to process event. Event dropped. #{e.message}")
next
ensure
- connection.close unless connection.nil?
+ conn&.close
end
end
def close
@stopping.make_true
@@ -127,13 +127,13 @@
private
def driver
case @driver_class
when /redshift/
- "redshift"
+ 'redshift'
else
- "postgres"
+ 'postgres'
end
end
def setup_and_test_pool!
@pool = Java::ComZaxxerHikari::HikariDataSource.new
@@ -145,29 +145,28 @@
@pool.setAutoCommit(@driver_auto_commit)
@pool.setJdbcUrl(@connection_string)
validate_connection_timeout = (@connection_timeout / 1000) / 2
- if !@connection_test_query.nil? and @connection_test_query.length > 1
+ if !@connection_test_query.nil? && (@connection_test_query.length > 1)
@pool.setConnectionTestQuery(@connection_test_query)
@pool.setConnectionInitSql(@connection_test_query)
end
return unless @connection_test
# Test connection
test_connection = @pool.getConnection
- unless test_connection.isValid(validate_connection_timeout)
- @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
- end
+ @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.') unless test_connection.isValid(validate_connection_timeout)
test_connection.close
end
# Load jar from driver path
def load_jar_files!
unless @driver_jar_path.nil?
raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path
+
require @driver_jar_path
return
end
# Revert original behaviour of loading from vendor directory if no path given
@@ -190,42 +189,38 @@
def get_schema(event)
if !@schema.nil?
@schema
else
- case event.to_hash["app_name"]
- when "Web App"
- "dea_webapp"
- when "Mobile App"
- "dea_mobileapp"
+ case event.to_hash['app_name']
+ when 'Web App'
+ 'dea_webapp'
+ when 'Mobile App'
+ 'dea_mobileapp'
else
- ""
+ ''
end
end
end
- def get_connection
- connection = @pool.getConnection
- rescue => e
+ def connection
+ @pool.getConnection
+ rescue StandardError => e
log_jdbc_exception(e, true, nil)
false
end
def log_jdbc_exception(exception, retrying, event)
current_exception = exception
- log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying')
+ log_text = "JDBC - Exception. #{retrying ? 'Retrying' : 'Not retrying'}"
log_method = (retrying ? 'warn' : 'error')
loop do
- @logger.send(log_method, log_text, :exception => current_exception, :event => event)
+ @logger.send(log_method, log_text, exception: current_exception, event: event)
- if current_exception.respond_to? 'getNextException'
- current_exception = current_exception.getNextException()
- else
- current_exception = nil
- end
+ current_exception = (current_exception.getNextException if current_exception.respond_to? 'getNextException')
- break if current_exception == nil
+ break if current_exception.nil?
end
end
end