lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.1.1 vs lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.2.0

- old
+ new

@@ -5,37 +5,44 @@ require 'stud/interval' require 'java' require 'logstash-output-charrington_jars' require 'json' require 'bigdecimal' +require 'pry' +require File.join(File.dirname(__FILE__), "charrington/process") +require File.join(File.dirname(__FILE__), "charrington/transform") +require File.join(File.dirname(__FILE__), "charrington/insert") # Write events to a SQL engine, using JDBC. -# -# It is upto the user of the plugin to correctly configure the plugin. This -# includes correctly crafting the SQL statement, and matching the number of -# parameters correctly. -# +# It is upto the user of the plugin to correctly configure the plugin. + +# This class is responsible for setting things up, creating the connection, +# and handling retries. Charrington::Insert is where the insert +# is attempted. If that fails, it will try to either +# create a table via Charrington::CreateTable +# or alter an existing one via Charrington::AlterTable + class LogStash::Outputs::Charrington < LogStash::Outputs::Base concurrency :shared STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze - RETRYABLE_SQLSTATE_CLASSES = [ - # Classes of retryable SQLSTATE codes - # Not all in the class will be retryable. However, this is the best that - # we've got right now. - # If a custom state code is required, set it in retry_sql_states. - '08', # Connection Exception - '24', # Invalid Cursor State (Maybe retry-able in some circumstances) - '25', # Invalid Transaction State - '40', # Transaction Rollback - '53', # Insufficient Resources - '54', # Program Limit Exceeded (MAYBE) - '55', # Object Not In Prerequisite State - '57', # Operator Intervention - '58', # System Error - ].freeze + # RETRYABLE_SQLSTATE_CLASSES = [ + # # Classes of retryable SQLSTATE codes + # # Not all in the class will be retryable. However, this is the best that + # # we've got right now. + # # If a custom state code is required, set it in retry_sql_states. + # '08', # Connection Exception + # '24', # Invalid Cursor State (Maybe retry-able in some circumstances) + # '25', # Invalid Transaction State + # '40', # Transaction Rollback + # '53', # Insufficient Resources + # '54', # Program Limit Exceeded (MAYBE) + # '55', # Object Not In Prerequisite State + # '57', # Operator Intervention + # '58', # System Error + # ].freeze config_name 'charrington' # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26 config :driver_class, validate: :string @@ -54,30 +61,16 @@ config :username, validate: :string, required: false # jdbc password - optional, maybe in the connection string config :password, validate: :string, required: false - # [ "insert into table (message) values(?)", "%{message}" ] - config :statement, validate: :array, required: true - - # If this is an unsafe statement, use event.sprintf - # This also has potential performance penalties due to having to create a - # new statement for each event, rather than adding to the batch and issuing - # multiple inserts in 1 go - config :unsafe_statement, validate: :boolean, default: false - # Number of connections in the pool to maintain config :max_pool_size, validate: :number, default: 5 # Connection timeout config :connection_timeout, validate: :number, default: 10000 - # We buffer a certain number of events before flushing that out to SQL. - # This setting controls how many events will be buffered before sending a - # batch of events. - config :flush_size, validate: :number, default: 1000 - # Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval` config :retry_initial_interval, validate: :number, default: 2 # Maximum time between retries, in seconds config :retry_max_interval, validate: :number, default: 128 @@ -92,11 +85,10 @@ config :connection_test_query, validate: :string, required: false # Maximum number of sequential failed attempts, before we stop retrying. # If set to < 1, then it will infinitely retry. # At the default values this is a little over 10 minutes - config :max_flush_exceptions, validate: :number, default: 10 config :max_repeat_exceptions, obsolete: 'This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation.' config :max_repeat_exceptions_time, obsolete: 'This is no longer required' config :idle_flush_time, obsolete: 'No longer necessary under Logstash v5' @@ -112,29 +104,27 @@ def register @logger.info('JDBC - Starting up') load_jar_files! - @stopping = Concurrent::AtomicBoolean.new(false) - @logger.warn('JDBC - Flush size is set to > 1000') if @flush_size > 1000 - - if @statement.empty? - @logger.error('JDBC - No statement provided. Configuration error.') - end - - if !@unsafe_statement && @statement.length < 2 - @logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.") - end - setup_and_test_pool! end def multi_receive(events) - events.each_slice(@flush_size) do |slice| - retrying_submit(slice) + events.each do |event| + connection = get_connection + break unless connection + + opts = { connection: connection, + schema: @schema, + max_retries: @max_flush_exceptions, + retry_initial_interval: @retry_initial_interval } + + Charrington::Process.call(connection, event, opts) + connection.close unless connection.nil? end end def close @stopping.make_true @@ -143,23 +133,18 @@ end private def setup_and_test_pool! - # Setup pool @pool = Java::ComZaxxerHikari::HikariDataSource.new - - @pool.setAutoCommit(@driver_auto_commit) @pool.setDriverClassName(@driver_class) if @driver_class - - @pool.setJdbcUrl(@connection_string) - @pool.setUsername(@username) if @username @pool.setPassword(@password) if @password - @pool.setMaximumPoolSize(@max_pool_size) @pool.setConnectionTimeout(@connection_timeout) + @pool.setAutoCommit(@driver_auto_commit) + @pool.setJdbcUrl(@connection_string) validate_connection_timeout = (@connection_timeout / 1000) / 2 if !@connection_test_query.nil? and @connection_test_query.length > 1 @pool.setConnectionTestQuery(@connection_test_query) @@ -174,20 +159,19 @@ @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.') end test_connection.close end + # Load jar from driver path def load_jar_files! - # Load jar from driver path unless @driver_jar_path.nil? raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path require @driver_jar_path return end - # Revert original behaviour of loading from vendor directory - # if no path given + # Revert original behaviour of loading from vendor directory if no path given jarpath = if ENV['LOGSTASH_HOME'] File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar') else File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar') end @@ -201,228 +185,33 @@ @logger.trace('JDBC - Loaded jar', jar: jar) require jar end end - def create_statement(event) - event = event.to_hash - hashed = event.delete_if {|k, _v| k.start_with?("@") || k == 'host' || k == 'path' } - - columns = '(' + hashed.keys.join(', ') + ')' - - value_placeholders = ('?' * hashed.length).split('') - values = '(' + value_placeholders.join(', ') + ')' - - table_name = create_table_name(event) - return "INSERT INTO #{table_name} #{columns} VALUES #{values}", hashed.keys + def get_connection + connection = @pool.getConnection + rescue => e + log_jdbc_exception(e, true, nil) + false end - def create_table_name(event) - raise TableNameNil.new("Table name is nil", event) if event.nil? - - event = event.to_hash["event"].to_s.strip - raise TableNameNil.new("Table name is nil", event) if event.empty? - - schema = @schema.empty? ? '' : "#{@schema}." - "#{schema}#{event.gsub(/[ \-_]+/, "_").downcase}" - end - - def prepared_statement(keys) - keys.map do |key| - turn_into_wrapped(key) - end - end - - def turn_into_wrapped(key) - "[#{key}]" - end - - def submit(events) - connection = nil - statement = nil - events_to_retry = [] - - begin - connection = @pool.getConnection - rescue => e - log_jdbc_exception(e, true, nil) - # If a connection is not available, then the server has gone away - # We're not counting that towards our retry count. - return events, false - end - - events.each do |event| - - - begin - ins, columns = create_statement(event) - keys_we_care_about = prepared_statement(columns) - statement = connection.prepareStatement( - ins - ) - statement = add_statement_event_params(statement, event, keys_we_care_about) - statement.execute - rescue TableNameNil => e - @logger.error("#{e.message} event=#{e.event}") - rescue => e - @logger.error "Rescue from SQLException #{e.message}" - create_statement = make_create_statement(event, columns) - puts 'create_statement' - puts create_statement - - statement = connection.prepareStatement( - create_statement - ) - statement.execute - @logger.debug('Created new Table.') - events_to_retry.push(event) - ensure - statement.close unless statement.nil? - end - end - - connection.close unless connection.nil? - - return events_to_retry, true - end - - def retrying_submit(actions) - # Initially we submit the full list of actions - submit_actions = actions - count_as_attempt = true - - attempts = 1 - - sleep_interval = @retry_initial_interval - while @stopping.false? and (submit_actions and !submit_actions.empty?) - return if !submit_actions || submit_actions.empty? # If everything's a success we move along - # We retry whatever didn't succeed - submit_actions, count_as_attempt = submit(submit_actions) - - # Everything was a success! - break if !submit_actions || submit_actions.empty? - - if @max_flush_exceptions > 0 and count_as_attempt == true - attempts += 1 - - if attempts > @max_flush_exceptions - @logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.") - break - end - end - - # If we're retrying the action sleep for the recommended interval - # Double the interval for the next time through to achieve exponential backoff - Stud.stoppable_sleep(sleep_interval) { @stopping.true? } - sleep_interval = next_sleep_interval(sleep_interval) - end - end - - def make_create_statement(event, keys_we_care_about) - columns = [] - - keys_we_care_about.each_with_index do |key, idx| - wrapped = turn_into_wrapped(key) - - case event.get(wrapped) - when Time, LogStash::Timestamp - columns << "#{key} TIMESTAMP" - when Integer - columns << "#{key} BIGINT" - when BigDecimal - columns << "#{key} DECIMAL" - when Float - columns << "#{key} DOUBLE PRECISION" - when String, Array, Hash - columns << "#{key} VARCHAR" - when true, false - columns << "#{key} BOOLEAN" - end - end - - "CREATE TABLE IF NOT EXISTS #{create_table_name(event)} (#{columns.join(', ')})" - end - - def add_statement_event_params(statement, event, keys_we_care_about) - keys_we_care_about.each_with_index do |key, idx| - if @enable_event_as_json_keyword == true and key.is_a? String and key == @event_as_json_keyword - value = event.to_json - elsif key.is_a? String - value = event.get(key) - if value.nil? and key =~ /%\{/ - value = event.sprintf(key) - end - else - value = key - end - - case value - when Time - statement.setString(idx + 1, value.strftime(STRFTIME_FMT)) - when LogStash::Timestamp - statement.setString(idx + 1, value.time.strftime(STRFTIME_FMT)) - when Integer - if value > 2147483647 or value < -2147483648 - statement.setLong(idx + 1, value) - else - statement.setInt(idx + 1, value) - end - when BigDecimal - statement.setBigDecimal(idx + 1, value.to_java) - when Float - statement.setFloat(idx + 1, value) - when String - statement.setString(idx + 1, value) - when Array, Hash - statement.setString(idx + 1, value.to_json) - when true, false - statement.setBoolean(idx + 1, value) - else - statement.setString(idx + 1, nil) - end - end - - statement - end - - def retry_exception?(exception, event) - retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState))) - log_jdbc_exception(exception, retrying, event) - - retrying - end - def log_jdbc_exception(exception, retrying, event) current_exception = exception log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') log_method = (retrying ? 'warn' : 'error') loop do # TODO reformat event output so that it only shows the fields necessary. - @logger.send(log_method, log_text, :exception => current_exception, :statement => @statement[0], :event => event) + @logger.send(log_method, log_text, :exception => current_exception, :event => event) if current_exception.respond_to? 'getNextException' current_exception = current_exception.getNextException() else current_exception = nil end break if current_exception == nil end - end - - def next_sleep_interval(current_interval) - doubled = current_interval * 2 - doubled > @retry_max_interval ? @retry_max_interval : doubled - end -end # class LogStash::Outputs::Charrington - -class TableNameNil < StandardError - attr_reader :event - - def initialize(msg='Table name is nil', event={}) - @event = event - super(msg) end end