lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.1.1 vs lib/logstash/outputs/charrington.rb in logstash-output-charrington-0.2.0
- old
+ new
@@ -5,37 +5,44 @@
require 'stud/interval'
require 'java'
require 'logstash-output-charrington_jars'
require 'json'
require 'bigdecimal'
+require 'pry'
+require File.join(File.dirname(__FILE__), "charrington/process")
+require File.join(File.dirname(__FILE__), "charrington/transform")
+require File.join(File.dirname(__FILE__), "charrington/insert")
# Write events to a SQL engine, using JDBC.
-#
-# It is upto the user of the plugin to correctly configure the plugin. This
-# includes correctly crafting the SQL statement, and matching the number of
-# parameters correctly.
-#
+# It is upto the user of the plugin to correctly configure the plugin.
+
+# This class is responsible for setting things up, creating the connection,
+# and handling retries. Charrington::Insert is where the insert
+# is attempted. If that fails, it will try to either
+# create a table via Charrington::CreateTable
+# or alter an existing one via Charrington::AlterTable
+
class LogStash::Outputs::Charrington < LogStash::Outputs::Base
concurrency :shared
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
- RETRYABLE_SQLSTATE_CLASSES = [
- # Classes of retryable SQLSTATE codes
- # Not all in the class will be retryable. However, this is the best that
- # we've got right now.
- # If a custom state code is required, set it in retry_sql_states.
- '08', # Connection Exception
- '24', # Invalid Cursor State (Maybe retry-able in some circumstances)
- '25', # Invalid Transaction State
- '40', # Transaction Rollback
- '53', # Insufficient Resources
- '54', # Program Limit Exceeded (MAYBE)
- '55', # Object Not In Prerequisite State
- '57', # Operator Intervention
- '58', # System Error
- ].freeze
+ # RETRYABLE_SQLSTATE_CLASSES = [
+ # # Classes of retryable SQLSTATE codes
+ # # Not all in the class will be retryable. However, this is the best that
+ # # we've got right now.
+ # # If a custom state code is required, set it in retry_sql_states.
+ # '08', # Connection Exception
+ # '24', # Invalid Cursor State (Maybe retry-able in some circumstances)
+ # '25', # Invalid Transaction State
+ # '40', # Transaction Rollback
+ # '53', # Insufficient Resources
+ # '54', # Program Limit Exceeded (MAYBE)
+ # '55', # Object Not In Prerequisite State
+ # '57', # Operator Intervention
+ # '58', # System Error
+ # ].freeze
config_name 'charrington'
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
config :driver_class, validate: :string
@@ -54,30 +61,16 @@
config :username, validate: :string, required: false
# jdbc password - optional, maybe in the connection string
config :password, validate: :string, required: false
- # [ "insert into table (message) values(?)", "%{message}" ]
- config :statement, validate: :array, required: true
-
- # If this is an unsafe statement, use event.sprintf
- # This also has potential performance penalties due to having to create a
- # new statement for each event, rather than adding to the batch and issuing
- # multiple inserts in 1 go
- config :unsafe_statement, validate: :boolean, default: false
-
# Number of connections in the pool to maintain
config :max_pool_size, validate: :number, default: 5
# Connection timeout
config :connection_timeout, validate: :number, default: 10000
- # We buffer a certain number of events before flushing that out to SQL.
- # This setting controls how many events will be buffered before sending a
- # batch of events.
- config :flush_size, validate: :number, default: 1000
-
# Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval`
config :retry_initial_interval, validate: :number, default: 2
# Maximum time between retries, in seconds
config :retry_max_interval, validate: :number, default: 128
@@ -92,11 +85,10 @@
config :connection_test_query, validate: :string, required: false
# Maximum number of sequential failed attempts, before we stop retrying.
# If set to < 1, then it will infinitely retry.
# At the default values this is a little over 10 minutes
-
config :max_flush_exceptions, validate: :number, default: 10
config :max_repeat_exceptions, obsolete: 'This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation.'
config :max_repeat_exceptions_time, obsolete: 'This is no longer required'
config :idle_flush_time, obsolete: 'No longer necessary under Logstash v5'
@@ -112,29 +104,27 @@
def register
@logger.info('JDBC - Starting up')
load_jar_files!
-
@stopping = Concurrent::AtomicBoolean.new(false)
- @logger.warn('JDBC - Flush size is set to > 1000') if @flush_size > 1000
-
- if @statement.empty?
- @logger.error('JDBC - No statement provided. Configuration error.')
- end
-
- if !@unsafe_statement && @statement.length < 2
- @logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
- end
-
setup_and_test_pool!
end
def multi_receive(events)
- events.each_slice(@flush_size) do |slice|
- retrying_submit(slice)
+ events.each do |event|
+ connection = get_connection
+ break unless connection
+
+ opts = { connection: connection,
+ schema: @schema,
+ max_retries: @max_flush_exceptions,
+ retry_initial_interval: @retry_initial_interval }
+
+ Charrington::Process.call(connection, event, opts)
+ connection.close unless connection.nil?
end
end
def close
@stopping.make_true
@@ -143,23 +133,18 @@
end
private
def setup_and_test_pool!
- # Setup pool
@pool = Java::ComZaxxerHikari::HikariDataSource.new
-
- @pool.setAutoCommit(@driver_auto_commit)
@pool.setDriverClassName(@driver_class) if @driver_class
-
- @pool.setJdbcUrl(@connection_string)
-
@pool.setUsername(@username) if @username
@pool.setPassword(@password) if @password
-
@pool.setMaximumPoolSize(@max_pool_size)
@pool.setConnectionTimeout(@connection_timeout)
+ @pool.setAutoCommit(@driver_auto_commit)
+ @pool.setJdbcUrl(@connection_string)
validate_connection_timeout = (@connection_timeout / 1000) / 2
if !@connection_test_query.nil? and @connection_test_query.length > 1
@pool.setConnectionTestQuery(@connection_test_query)
@@ -174,20 +159,19 @@
@logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
end
test_connection.close
end
+ # Load jar from driver path
def load_jar_files!
- # Load jar from driver path
unless @driver_jar_path.nil?
raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path
require @driver_jar_path
return
end
- # Revert original behaviour of loading from vendor directory
- # if no path given
+ # Revert original behaviour of loading from vendor directory if no path given
jarpath = if ENV['LOGSTASH_HOME']
File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar')
else
File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar')
end
@@ -201,228 +185,33 @@
@logger.trace('JDBC - Loaded jar', jar: jar)
require jar
end
end
- def create_statement(event)
- event = event.to_hash
- hashed = event.delete_if {|k, _v| k.start_with?("@") || k == 'host' || k == 'path' }
-
- columns = '(' + hashed.keys.join(', ') + ')'
-
- value_placeholders = ('?' * hashed.length).split('')
- values = '(' + value_placeholders.join(', ') + ')'
-
- table_name = create_table_name(event)
- return "INSERT INTO #{table_name} #{columns} VALUES #{values}", hashed.keys
+ def get_connection
+ connection = @pool.getConnection
+ rescue => e
+ log_jdbc_exception(e, true, nil)
+ false
end
- def create_table_name(event)
- raise TableNameNil.new("Table name is nil", event) if event.nil?
-
- event = event.to_hash["event"].to_s.strip
- raise TableNameNil.new("Table name is nil", event) if event.empty?
-
- schema = @schema.empty? ? '' : "#{@schema}."
- "#{schema}#{event.gsub(/[ \-_]+/, "_").downcase}"
- end
-
- def prepared_statement(keys)
- keys.map do |key|
- turn_into_wrapped(key)
- end
- end
-
- def turn_into_wrapped(key)
- "[#{key}]"
- end
-
- def submit(events)
- connection = nil
- statement = nil
- events_to_retry = []
-
- begin
- connection = @pool.getConnection
- rescue => e
- log_jdbc_exception(e, true, nil)
- # If a connection is not available, then the server has gone away
- # We're not counting that towards our retry count.
- return events, false
- end
-
- events.each do |event|
-
-
- begin
- ins, columns = create_statement(event)
- keys_we_care_about = prepared_statement(columns)
- statement = connection.prepareStatement(
- ins
- )
- statement = add_statement_event_params(statement, event, keys_we_care_about)
- statement.execute
- rescue TableNameNil => e
- @logger.error("#{e.message} event=#{e.event}")
- rescue => e
- @logger.error "Rescue from SQLException #{e.message}"
- create_statement = make_create_statement(event, columns)
- puts 'create_statement'
- puts create_statement
-
- statement = connection.prepareStatement(
- create_statement
- )
- statement.execute
- @logger.debug('Created new Table.')
- events_to_retry.push(event)
- ensure
- statement.close unless statement.nil?
- end
- end
-
- connection.close unless connection.nil?
-
- return events_to_retry, true
- end
-
- def retrying_submit(actions)
- # Initially we submit the full list of actions
- submit_actions = actions
- count_as_attempt = true
-
- attempts = 1
-
- sleep_interval = @retry_initial_interval
- while @stopping.false? and (submit_actions and !submit_actions.empty?)
- return if !submit_actions || submit_actions.empty? # If everything's a success we move along
- # We retry whatever didn't succeed
- submit_actions, count_as_attempt = submit(submit_actions)
-
- # Everything was a success!
- break if !submit_actions || submit_actions.empty?
-
- if @max_flush_exceptions > 0 and count_as_attempt == true
- attempts += 1
-
- if attempts > @max_flush_exceptions
- @logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.")
- break
- end
- end
-
- # If we're retrying the action sleep for the recommended interval
- # Double the interval for the next time through to achieve exponential backoff
- Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
- sleep_interval = next_sleep_interval(sleep_interval)
- end
- end
-
- def make_create_statement(event, keys_we_care_about)
- columns = []
-
- keys_we_care_about.each_with_index do |key, idx|
- wrapped = turn_into_wrapped(key)
-
- case event.get(wrapped)
- when Time, LogStash::Timestamp
- columns << "#{key} TIMESTAMP"
- when Integer
- columns << "#{key} BIGINT"
- when BigDecimal
- columns << "#{key} DECIMAL"
- when Float
- columns << "#{key} DOUBLE PRECISION"
- when String, Array, Hash
- columns << "#{key} VARCHAR"
- when true, false
- columns << "#{key} BOOLEAN"
- end
- end
-
- "CREATE TABLE IF NOT EXISTS #{create_table_name(event)} (#{columns.join(', ')})"
- end
-
- def add_statement_event_params(statement, event, keys_we_care_about)
- keys_we_care_about.each_with_index do |key, idx|
- if @enable_event_as_json_keyword == true and key.is_a? String and key == @event_as_json_keyword
- value = event.to_json
- elsif key.is_a? String
- value = event.get(key)
- if value.nil? and key =~ /%\{/
- value = event.sprintf(key)
- end
- else
- value = key
- end
-
- case value
- when Time
- statement.setString(idx + 1, value.strftime(STRFTIME_FMT))
- when LogStash::Timestamp
- statement.setString(idx + 1, value.time.strftime(STRFTIME_FMT))
- when Integer
- if value > 2147483647 or value < -2147483648
- statement.setLong(idx + 1, value)
- else
- statement.setInt(idx + 1, value)
- end
- when BigDecimal
- statement.setBigDecimal(idx + 1, value.to_java)
- when Float
- statement.setFloat(idx + 1, value)
- when String
- statement.setString(idx + 1, value)
- when Array, Hash
- statement.setString(idx + 1, value.to_json)
- when true, false
- statement.setBoolean(idx + 1, value)
- else
- statement.setString(idx + 1, nil)
- end
- end
-
- statement
- end
-
- def retry_exception?(exception, event)
- retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState)))
- log_jdbc_exception(exception, retrying, event)
-
- retrying
- end
-
def log_jdbc_exception(exception, retrying, event)
current_exception = exception
log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying')
log_method = (retrying ? 'warn' : 'error')
loop do
# TODO reformat event output so that it only shows the fields necessary.
- @logger.send(log_method, log_text, :exception => current_exception, :statement => @statement[0], :event => event)
+ @logger.send(log_method, log_text, :exception => current_exception, :event => event)
if current_exception.respond_to? 'getNextException'
current_exception = current_exception.getNextException()
else
current_exception = nil
end
break if current_exception == nil
end
- end
-
- def next_sleep_interval(current_interval)
- doubled = current_interval * 2
- doubled > @retry_max_interval ? @retry_max_interval : doubled
- end
-end # class LogStash::Outputs::Charrington
-
-class TableNameNil < StandardError
- attr_reader :event
-
- def initialize(msg='Table name is nil', event={})
- @event = event
- super(msg)
end
end