lib/fluent/plugin/out_sql.rb in fluent-plugin-sql-0.5.1 vs lib/fluent/plugin/out_sql.rb in fluent-plugin-sql-0.5.2
- old
+ new
@@ -1,5 +1,7 @@
+require "fluent/output"
+
module Fluent
class SQLOutput < BufferedOutput
Plugin.register_output('sql', self)
include SetTimeKeyMixin
@@ -27,10 +29,12 @@
config_param :database, :string
desc 'RDBMS socket path'
config_param :socket, :string, :default => nil
desc 'remove the given prefix from the events'
config_param :remove_tag_prefix, :string, :default => nil
+ desc 'enable fallback'
+ config_param :enable_fallback, :bool, :default => true
attr_accessor :tables
unless method_defined?(:log)
define_method(:log) { $log }
@@ -45,14 +49,15 @@
config_param :num_retries, :integer, :default => 5
attr_reader :model
attr_reader :pattern
- def initialize(pattern, log)
+ def initialize(pattern, log, enable_fallback)
super()
@pattern = MatchPattern.create(pattern)
@log = log
+ @enable_fallback = enable_fallback
end
def configure(conf)
super
@@ -95,13 +100,18 @@
end
}
begin
@model.import(records)
rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
- # ignore other exceptions to use Fluentd retry mechanizm
- @log.warn "Got deterministic error. Fallback to one-by-one import", :error => e.message, :error_class => e.class
- one_by_one_import(records)
+ if @enable_fallback
+ # ignore other exceptions to use Fluentd retry mechanizm
+ @log.warn "Got deterministic error. Fallback to one-by-one import", :error => e.message, :error_class => e.class
+ one_by_one_import(records)
+ else
+ $log.warn "Got deterministic error. Fallback is disabled", :error => e.message, :error_class => e.class
+ raise e
+ end
end
end
def one_by_one_import(records)
records.each { |record|
@@ -153,10 +163,10 @@
@tables = []
@default_table = nil
conf.elements.select { |e|
e.name == 'table'
}.each { |e|
- te = TableElement.new(e.arg, log)
+ te = TableElement.new(e.arg, log, @enable_fallback)
te.configure(e)
if e.arg.empty?
$log.warn "Detect duplicate default table definition" if @default_table
@default_table = te
else