lib/logstash/outputs/charrington/process.rb in logstash-output-charrington-0.3.12 vs lib/logstash/outputs/charrington/process.rb in logstash-output-charrington-0.3.13

- old
+ new

@@ -5,11 +5,11 @@ # This service starts the process of attempting to insert a row. # It handles retries where applicable. include Service include LogStash::Util::Loggable - attr_reader :event, :connection, :opts, :max_retries, :schema, :retry_max_interval, :driver + attr_reader :event, :connection, :opts, :max_retries, :schema, :retry_max_interval, :driver, :transformer attr_accessor :retry_interval, :should_retry Error = Class.new(StandardError) ProcessFailed = Class.new(Error) EventNil = Class.new(Error) @@ -22,22 +22,23 @@ @max_retries = opts[:max_retries] || 10 @retry_max_interval = opts[:retry_max_interval] || 2 @retry_interval = opts[:retry_initial_interval] || 2 @driver = opts[:driver] + @transformer = opts[:transformer] @attempts = 1 @should_retry = true end def call while should_retry do - transformed = case driver + transformed = case transformer when "redshift" - self.logger.info "Found driver for redshift with event of: #{event}" + self.logger.info "Found transformer of #{transformer} for driver of #{driver} with event of: #{event}" Charrington::TransformRedshift.call(event) else - self.logger.info "Found driver for postgres with event of: #{event}" + self.logger.info "Found transformer of #{transformer} for driver of #{driver} with event of: #{event}" Charrington::TransformPostgres.call(event) end self.logger.info "Transformed event into: #{transformed}" should_retry = Charrington::Insert.call(connection, transformed, opts) break if !should_retry