lib/fluent/plugin/out_mysql_load.rb in fluent-plugin-mysql-load-0.0.1 vs lib/fluent/plugin/out_mysql_load.rb in fluent-plugin-mysql-load-0.0.2

- old
+ new

@@ -19,21 +19,23 @@ config_param :port, :integer, :default => 3306 config_param :username, :string, :default => 'root' config_param :password, :string, :default => nil config_param :database, :string, :default => nil config_param :tablename, :string, :default => nil - config_param :columns, :string, :default => nil + config_param :key_names, :string, :default => nil + config_param :column_names, :string, :default => nil config_param :encoding, :string, :default => 'utf8' def configure(conf) super - if @database.nil? || @tablename.nil? - raise Fluent::ConfigError, "database and tablename is required!" + if @database.nil? || @tablename.nil? || @column_names.nil? + raise Fluent::ConfigError, "database and tablename and column_names is required." end - if (!@columns.nil?) - @columns = @columns.split(",") + @key_names = @key_names.nil? ? @column_names.split(',') : @key_names.split(',') + unless @column_names.split(',').count == @key_names.count + raise Fluent::ConfigError, "It does not take the integrity of the key_names and column_names." end end def start super @@ -42,47 +44,42 @@ def shutdown super end def format(tag, time, record) - record.to_msgpack + values = @key_names.map { |k| + k == '${time}' ? Time.at(time).strftime('%Y-%m-%d %H:%M:%S') : record[k] + } + values.to_msgpack end def write(chunk) - tmp = Tempfile.new("loaddata") - keys = nil + data_count = 0 + tmp = Tempfile.new("mysql-loaddata") chunk.msgpack_each { |record| - # keyの取得は初回のみ - if keys.nil? - # columnsが指定されている場合はそっちを有効にする - keys = @columns.nil? ? record.keys : @columns - end - - values = [] - keys.each{ |key| - values << record[key] - } - - tmp.write values.join("\t") + "\n" + tmp.write record.join("\t") + "\n" + data_count += 1 } tmp.close - query = QUERY_TEMPLATE % ([tmp.path, @tablename, keys.join(",")]) - conn = get_connection - conn.query(query) + conn.query(QUERY_TEMPLATE % ([tmp.path, @tablename, @column_names])) conn.close + + log.info "number that is registered in the \"%s:%s\" table is %d" % ([@database, @tablename, data_count]) end private + def get_connection Mysql2::Client.new({ :host => @host, :port => @port, :username => @username, :password => @password, :database => @database, - :encoding => @encoding + :encoding => @encoding, + :local_infile => true }) end end end