lib/fluent/plugin/out_sql.rb in fluent-plugin-sql-0.6.1 vs lib/fluent/plugin/out_sql.rb in fluent-plugin-sql-1.0.0.rc1
- old
+ new
@@ -1,61 +1,54 @@
-require "fluent/output"
+require "fluent/plugin/output"
-module Fluent
- class SQLOutput < BufferedOutput
- Plugin.register_output('sql', self)
+module Fluent::Plugin
+ class SQLOutput < Output
+ Fluent::Plugin.register_output('sql', self)
- include SetTimeKeyMixin
- include SetTagKeyMixin
+ DEFAULT_BUFFER_TYPE = "memory"
- # For fluentd v0.12.16 or earlier
- class << self
- unless method_defined?(:desc)
- def desc(description)
- end
- end
- end
+ helpers :inject, :compat_parameters, :event_emitter
desc 'RDBMS host'
config_param :host, :string
desc 'RDBMS port'
- config_param :port, :integer, :default => nil
+ config_param :port, :integer, default: nil
desc 'RDBMS driver name.'
config_param :adapter, :string
desc 'RDBMS login user name'
- config_param :username, :string, :default => nil
+ config_param :username, :string, default: nil
desc 'RDBMS login password'
- config_param :password, :string, :default => nil, :secret => true
+ config_param :password, :string, default: nil, secret: true
desc 'RDBMS database name'
config_param :database, :string
desc 'RDBMS socket path'
- config_param :socket, :string, :default => nil
+ config_param :socket, :string, default: nil
desc 'remove the given prefix from the events'
- config_param :remove_tag_prefix, :string, :default => nil
+ config_param :remove_tag_prefix, :string, default: nil
desc 'enable fallback'
- config_param :enable_fallback, :bool, :default => true
+ config_param :enable_fallback, :bool, default: true
- attr_accessor :tables
-
- unless method_defined?(:log)
- define_method(:log) { $log }
+ config_section :buffer do
+ config_set_default :@type, DEFAULT_BUFFER_TYPE
end
+ attr_accessor :tables
+
# TODO: Merge SQLInput's TableElement
class TableElement
- include Configurable
+ include Fluent::Configurable
config_param :table, :string
config_param :column_mapping, :string
- config_param :num_retries, :integer, :default => 5
+ config_param :num_retries, :integer, default: 5
attr_reader :model
attr_reader :pattern
def initialize(pattern, log, enable_fallback)
super()
- @pattern = MatchPattern.create(pattern)
+ @pattern = Fluent::MatchPattern.create(pattern)
@log = log
@enable_fallback = enable_fallback
end
def configure(conf)
@@ -93,23 +86,23 @@
chunk.msgpack_each { |tag, time, data|
begin
# format process should be moved to emit / format after supports error stream.
records << @model.new(@format_proc.call(data))
rescue => e
- args = {:error => e.message, :error_class => e.class, :table => @table, :record => Yajl.dump(data)}
+ args = {error: e, table: @table, record: Yajl.dump(data)}
@log.warn "Failed to create the model. Ignore a record:", args
end
}
begin
@model.import(records)
rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
if @enable_fallback
# ignore other exceptions to use Fluentd retry mechanizm
- @log.warn "Got deterministic error. Fallback to one-by-one import", :error => e.message, :error_class => e.class
+ @log.warn "Got deterministic error. Fallback to one-by-one import", error: e
one_by_one_import(records)
else
- $log.warn "Got deterministic error. Fallback is disabled", :error => e.message, :error_class => e.class
+ $log.warn "Got deterministic error. Fallback is disabled", error: e
raise e
end
end
end
@@ -117,19 +110,19 @@
records.each { |record|
retries = 0
begin
@model.import([record])
rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
- @log.error "Got deterministic error again. Dump a record", :error => e.message, :error_class => e.class, :record => record
+ @log.error "Got deterministic error again. Dump a record", error: e, record: record
rescue => e
retries += 1
if retries > @num_retries
- @log.error "Can't recover undeterministic error. Dump a record", :error => e.message, :error_class => e.class, :record => record
+ @log.error "Can't recover undeterministic error. Dump a record", error: e, record: record
next
end
- @log.warn "Failed to import a record: retry number = #{retries}", :error => e.message, :error_class => e.class
+ @log.warn "Failed to import a record: retry number = #{retries}", error: e
sleep 0.5
retry
end
}
end
@@ -152,10 +145,12 @@
require 'active_record'
require 'activerecord-import'
end
def configure(conf)
+ compat_parameters_convert(conf, :inject, :buffer)
+
super
if remove_tag_prefix = conf['remove_tag_prefix']
@remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
end
@@ -175,25 +170,25 @@
end
}
@only_default = @tables.empty?
if @default_table.nil?
- raise ConfigError, "There is no default table. <table> is required in sql output"
+ raise Fluent::ConfigError, "There is no default table. <table> is required in sql output"
end
end
def start
super
config = {
- :adapter => @adapter,
- :host => @host,
- :port => @port,
- :database => @database,
- :username => @username,
- :password => @password,
- :socket => @socket,
+ adapter: @adapter,
+ host: @host,
+ port: @port,
+ database: @database,
+ username: @username,
+ password: @password,
+ socket: @socket,
}
@base_model = Class.new(ActiveRecord::Base) do
self.abstract_class = true
end
@@ -219,13 +214,18 @@
super(tag, es, chain, format_tag(tag))
end
end
def format(tag, time, record)
+ record = inject_values_to_record(tag, time, record)
[tag, time, record].to_msgpack
end
+ def formatted_to_msgpack_binary
+ true
+ end
+
def write(chunk)
ActiveRecord::Base.connection_pool.with_connection do
@tables.each { |table|
if table.pattern.match(chunk.key)
@@ -242,10 +242,10 @@
begin
te.init(base_model)
log.info "Selecting '#{te.table}' table"
false
rescue => e
- log.warn "Can't handle '#{te.table}' table. Ignoring.", :error => e.message, :error_class => e.class
+ log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e
log.warn_backtrace e.backtrace
true
end
end