lib/fluent/plugin/out_mysql_bulk.rb in fluent-plugin-mysql-bulk-0.0.4 vs lib/fluent/plugin/out_mysql_bulk.rb in fluent-plugin-mysql-bulk-0.0.5

- old
+ new

@@ -1,22 +1,22 @@ # -*- encoding : utf-8 -*- module Fluent class Fluent::MysqlBulkOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('mysql_bulk', self) - config_param :host, :string, :default => "127.0.0.1" - config_param :port, :integer, :default => 3306 + config_param :host, :string, default: '127.0.0.1' + config_param :port, :integer, default: 3306 config_param :database, :string config_param :username, :string - config_param :password, :string, :default => '' + config_param :password, :string, default: '' config_param :column_names, :string - config_param :key_names, :string, :default => nil + config_param :key_names, :string, default: nil config_param :table, :string - config_param :on_duplicate_key_update, :bool, :default => false - config_param :on_duplicate_update_keys, :string, :default => nil + config_param :on_duplicate_key_update, :bool, default: false + config_param :on_duplicate_update_keys, :string, default: nil attr_accessor :handler def initialize super @@ -25,70 +25,98 @@ def configure(conf) super if @column_names.nil? - raise Fluent::ConfigError, "column_names MUST be specified, but missing" + fail Fluent::ConfigError, 'column_names MUST specified, but missing' end if @on_duplicate_key_update if @on_duplicate_update_keys.nil? - raise Fluent::ConfigError, "on_duplicate_key_update = true , on_duplicate_update_keys nil!" + fail Fluent::ConfigError, 'on_duplicate_key_update = true , on_duplicate_update_keys nil!' end @on_duplicate_update_keys = @on_duplicate_update_keys.split(',') - @on_duplicate_key_update_sql = " ON DUPLICATE KEY UPDATE " + @on_duplicate_key_update_sql = ' ON DUPLICATE KEY UPDATE ' updates = [] - @on_duplicate_update_keys.each{|update_column| + @on_duplicate_update_keys.each do |update_column| updates.push(" #{update_column} = VALUES(#{update_column})") - } + end @on_duplicate_key_update_sql += updates.join(',') end @column_names = @column_names.split(',') @key_names = @key_names.nil? ? @column_names : @key_names.split(',') - @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| k == '${time}' ? Time.at(time).strftime("%Y-%m-%d %H:%M:%S") : record[k]}} - end def start super + result = client.xquery("SHOW COLUMNS FROM #{@table}") + @max_lengths = [] + @column_names.each do |column| + info = result.select { |x| x['Field'] == column }.first + r = /(char|varchar)\(([\d]+)\)/ + begin + max_length = info['Type'].scan(r)[0][1].to_i + rescue + max_length = nil + end + @max_lengths << max_length + end end def shutdown super end def format(tag, time, record) - [tag, time, @format_proc.call(tag, time, record)].to_msgpack + [tag, time, format_proc.call(tag, time, record)].to_msgpack end def client - Mysql2::Client.new({ - :host => @host, - :port => @port, - :username => @username, - :password => @password, - :database => @database, - :flags => Mysql2::Client::MULTI_STATEMENTS - }) + Mysql2::Client.new( + host: @host, + port: @port, + username: @username, + password: @password, + database: @database, + flags: Mysql2::Client::MULTI_STATEMENTS + ) end def write(chunk) @handler = client values_templates = [] - values = Array.new - chunk.msgpack_each { |tag, time, data| - values_templates.push "(#{@column_names.map{|key| '?'}.join(',')})" + values = [] + chunk.msgpack_each do |tag, time, data| + values_templates << "(#{ @column_names.map { |key| '?' }.join(',') })" values.concat(data) - } - sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{values_templates.join(',')}" - if @on_duplicate_key_update - sql += @on_duplicate_key_update_sql end + sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{values_templates.join(',')}" + sql += @on_duplicate_key_update_sql if @on_duplicate_key_update - $log.info "bulk insert sql => [#{Mysql2::Client.pseudo_bind(sql, values)}]" + $log.info "bulk insert values size => #{values_templates.size}" @handler.xquery(sql, values) end + private + + def format_proc + proc do |tag, time, record| + values = [] + @key_names.each_with_index do |key, i| + if key == '${time}' + value = Time.at(time).strftime('%Y-%m-%d %H:%M:%S') + else + if @max_lengths[i].nil? || record[key].nil? + value = record[key] + else + value = record[key].slice(0, @max_lengths[i]) + end + end + values << value + end + values + end + end end end