lib/fluent/plugin/out_sql.rb in fluent-plugin-sql-0.6.1 vs lib/fluent/plugin/out_sql.rb in fluent-plugin-sql-1.0.0.rc1

- old
+ new

@@ -1,61 +1,54 @@ -require "fluent/output" +require "fluent/plugin/output" -module Fluent - class SQLOutput < BufferedOutput - Plugin.register_output('sql', self) +module Fluent::Plugin + class SQLOutput < Output + Fluent::Plugin.register_output('sql', self) - include SetTimeKeyMixin - include SetTagKeyMixin + DEFAULT_BUFFER_TYPE = "memory" - # For fluentd v0.12.16 or earlier - class << self - unless method_defined?(:desc) - def desc(description) - end - end - end + helpers :inject, :compat_parameters, :event_emitter desc 'RDBMS host' config_param :host, :string desc 'RDBMS port' - config_param :port, :integer, :default => nil + config_param :port, :integer, default: nil desc 'RDBMS driver name.' config_param :adapter, :string desc 'RDBMS login user name' - config_param :username, :string, :default => nil + config_param :username, :string, default: nil desc 'RDBMS login password' - config_param :password, :string, :default => nil, :secret => true + config_param :password, :string, default: nil, secret: true desc 'RDBMS database name' config_param :database, :string desc 'RDBMS socket path' - config_param :socket, :string, :default => nil + config_param :socket, :string, default: nil desc 'remove the given prefix from the events' - config_param :remove_tag_prefix, :string, :default => nil + config_param :remove_tag_prefix, :string, default: nil desc 'enable fallback' - config_param :enable_fallback, :bool, :default => true + config_param :enable_fallback, :bool, default: true - attr_accessor :tables - - unless method_defined?(:log) - define_method(:log) { $log } + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE end + attr_accessor :tables + # TODO: Merge SQLInput's TableElement class TableElement - include Configurable + include Fluent::Configurable config_param :table, :string config_param :column_mapping, :string - config_param :num_retries, :integer, :default => 5 + config_param :num_retries, :integer, default: 5 attr_reader :model attr_reader :pattern def initialize(pattern, log, enable_fallback) super() - @pattern = MatchPattern.create(pattern) + @pattern = Fluent::MatchPattern.create(pattern) @log = log @enable_fallback = enable_fallback end def configure(conf) @@ -93,23 +86,23 @@ chunk.msgpack_each { |tag, time, data| begin # format process should be moved to emit / format after supports error stream. records << @model.new(@format_proc.call(data)) rescue => e - args = {:error => e.message, :error_class => e.class, :table => @table, :record => Yajl.dump(data)} + args = {error: e, table: @table, record: Yajl.dump(data)} @log.warn "Failed to create the model. Ignore a record:", args end } begin @model.import(records) rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e if @enable_fallback # ignore other exceptions to use Fluentd retry mechanizm - @log.warn "Got deterministic error. Fallback to one-by-one import", :error => e.message, :error_class => e.class + @log.warn "Got deterministic error. Fallback to one-by-one import", error: e one_by_one_import(records) else - $log.warn "Got deterministic error. Fallback is disabled", :error => e.message, :error_class => e.class + $log.warn "Got deterministic error. Fallback is disabled", error: e raise e end end end @@ -117,19 +110,19 @@ records.each { |record| retries = 0 begin @model.import([record]) rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e - @log.error "Got deterministic error again. Dump a record", :error => e.message, :error_class => e.class, :record => record + @log.error "Got deterministic error again. Dump a record", error: e, record: record rescue => e retries += 1 if retries > @num_retries - @log.error "Can't recover undeterministic error. Dump a record", :error => e.message, :error_class => e.class, :record => record + @log.error "Can't recover undeterministic error. Dump a record", error: e, record: record next end - @log.warn "Failed to import a record: retry number = #{retries}", :error => e.message, :error_class => e.class + @log.warn "Failed to import a record: retry number = #{retries}", error: e sleep 0.5 retry end } end @@ -152,10 +145,12 @@ require 'active_record' require 'activerecord-import' end def configure(conf) + compat_parameters_convert(conf, :inject, :buffer) + super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @@ -175,25 +170,25 @@ end } @only_default = @tables.empty? if @default_table.nil? - raise ConfigError, "There is no default table. <table> is required in sql output" + raise Fluent::ConfigError, "There is no default table. <table> is required in sql output" end end def start super config = { - :adapter => @adapter, - :host => @host, - :port => @port, - :database => @database, - :username => @username, - :password => @password, - :socket => @socket, + adapter: @adapter, + host: @host, + port: @port, + database: @database, + username: @username, + password: @password, + socket: @socket, } @base_model = Class.new(ActiveRecord::Base) do self.abstract_class = true end @@ -219,13 +214,18 @@ super(tag, es, chain, format_tag(tag)) end end def format(tag, time, record) + record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end + def formatted_to_msgpack_binary + true + end + def write(chunk) ActiveRecord::Base.connection_pool.with_connection do @tables.each { |table| if table.pattern.match(chunk.key) @@ -242,10 +242,10 @@ begin te.init(base_model) log.info "Selecting '#{te.table}' table" false rescue => e - log.warn "Can't handle '#{te.table}' table. Ignoring.", :error => e.message, :error_class => e.class + log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e log.warn_backtrace e.backtrace true end end