# encoding: utf-8 require 'logstash/outputs/base' require 'logstash/namespace' require 'concurrent' require 'stud/interval' require 'java' require 'logstash-output-charrington_jars' require 'json' require 'bigdecimal' require File.join(File.dirname(__FILE__), "charrington/process") require File.join(File.dirname(__FILE__), "charrington/transform_postgres") require File.join(File.dirname(__FILE__), "charrington/transform_redshift") require File.join(File.dirname(__FILE__), "charrington/insert") # Write events to a SQL engine, using JDBC. # It is upto the user of the plugin to correctly configure the plugin. # This class is responsible for setting things up, creating the connection, # and handling retries. Charrington::Insert is where the insert # is attempted. If that fails, it will try to either # create a table via Charrington::CreateTable # or alter an existing one via Charrington::AlterTable class LogStash::Outputs::Charrington < LogStash::Outputs::Base concurrency :shared config_name 'charrington' STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26 config :driver_class, validate: :string # Does the JDBC driver support autocommit? config :driver_auto_commit, validate: :boolean, default: true, required: true # Where to find the jar # Defaults to not required, and to the original behaviour config :driver_jar_path, validate: :string, required: false # jdbc connection string config :connection_string, validate: :string, required: true # jdbc username - optional, maybe in the connection string config :username, validate: :string, required: false # jdbc password - optional, maybe in the connection string config :password, validate: :string, required: false # Number of connections in the pool to maintain config :max_pool_size, validate: :number, default: 5 # Connection timeout config :connection_timeout, validate: :number, default: 10000 # Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval` config :retry_initial_interval, validate: :number, default: 2 # Maximum time between retries, in seconds config :retry_max_interval, validate: :number, default: 128 # Any additional custom, retryable SQL state codes. # Suitable for configuring retryable custom JDBC SQL state codes. config :retry_sql_states, validate: :array, default: [] # Run a connection test on start. config :connection_test, validate: :boolean, default: true config :connection_test_query, validate: :string, required: false # Maximum number of sequential failed attempts, before we stop retrying. # If set to < 1, then it will infinitely retry. # At the default values this is a little over 10 minutes config :max_flush_exceptions, validate: :number, default: 10 config :max_repeat_exceptions, obsolete: 'This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation.' config :max_repeat_exceptions_time, obsolete: 'This is no longer required' config :idle_flush_time, obsolete: 'No longer necessary under Logstash v5' # Allows the whole event to be converted to JSON config :enable_event_as_json_keyword, validate: :boolean, default: false # The magic key used to convert the whole event to JSON. If you need this, and you have the default in your events, you can use this to change your magic keyword. config :event_as_json_keyword, validate: :string, default: '@event' # The database schema config :schema, validate: :string, required: false def register @logger.info('JDBC - Starting up') load_jar_files! @stopping = Concurrent::AtomicBoolean.new(false) setup_and_test_pool! end def multi_receive(events) events.each do |event| connection = get_connection break unless connection schema = get_schema(event) opts = { connection: connection, schema: schema, max_retries: @max_flush_exceptions, retry_initial_interval: @retry_initial_interval, driver: driver } Charrington::Process.call(connection, event, opts) rescue => e @logger.error("Unable to process event. Event dropped. #{e.message}") next ensure connection.close unless connection.nil? end end def close @stopping.make_true @pool.close super end private def driver case @driver_class when /redshift/ "redshift" else "postgres" end end def setup_and_test_pool! @pool = Java::ComZaxxerHikari::HikariDataSource.new @pool.setDriverClassName(@driver_class) if @driver_class @pool.setUsername(@username) if @username @pool.setPassword(@password) if @password @pool.setMaximumPoolSize(@max_pool_size) @pool.setConnectionTimeout(@connection_timeout) @pool.setAutoCommit(@driver_auto_commit) @pool.setJdbcUrl(@connection_string) validate_connection_timeout = (@connection_timeout / 1000) / 2 if !@connection_test_query.nil? and @connection_test_query.length > 1 @pool.setConnectionTestQuery(@connection_test_query) @pool.setConnectionInitSql(@connection_test_query) end return unless @connection_test # Test connection test_connection = @pool.getConnection unless test_connection.isValid(validate_connection_timeout) @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.') end test_connection.close end # Load jar from driver path def load_jar_files! unless @driver_jar_path.nil? raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path require @driver_jar_path return end # Revert original behaviour of loading from vendor directory if no path given jarpath = if ENV['LOGSTASH_HOME'] File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar') else File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar') end @logger.trace('JDBC - jarpath', path: jarpath) jars = Dir[jarpath] raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty? jars.each do |jar| @logger.trace('JDBC - Loaded jar', jar: jar) require jar end end def get_schema(event) if !@schema.nil? @schema else case event.to_hash["app_name"] when "Web App" "dea_webapp" when "Mobile App" "dea_mobileapp" else "" end end end def get_connection connection = @pool.getConnection rescue => e log_jdbc_exception(e, true, nil) false end def log_jdbc_exception(exception, retrying, event) current_exception = exception log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') log_method = (retrying ? 'warn' : 'error') loop do @logger.send(log_method, log_text, :exception => current_exception, :event => event) if current_exception.respond_to? 'getNextException' current_exception = current_exception.getNextException() else current_exception = nil end break if current_exception == nil end end end