lib/fluent/plugin/out_mysql_load.rb in fluent-plugin-mysql-load-0.0.1 vs lib/fluent/plugin/out_mysql_load.rb in fluent-plugin-mysql-load-0.0.2
- old
+ new
@@ -19,21 +19,23 @@
config_param :port, :integer, :default => 3306
config_param :username, :string, :default => 'root'
config_param :password, :string, :default => nil
config_param :database, :string, :default => nil
config_param :tablename, :string, :default => nil
- config_param :columns, :string, :default => nil
+ config_param :key_names, :string, :default => nil
+ config_param :column_names, :string, :default => nil
config_param :encoding, :string, :default => 'utf8'
def configure(conf)
super
- if @database.nil? || @tablename.nil?
- raise Fluent::ConfigError, "database and tablename is required!"
+ if @database.nil? || @tablename.nil? || @column_names.nil?
+ raise Fluent::ConfigError, "database and tablename and column_names is required."
end
- if (!@columns.nil?)
- @columns = @columns.split(",")
+ @key_names = @key_names.nil? ? @column_names.split(',') : @key_names.split(',')
+ unless @column_names.split(',').count == @key_names.count
+ raise Fluent::ConfigError, "It does not take the integrity of the key_names and column_names."
end
end
def start
super
@@ -42,47 +44,42 @@
def shutdown
super
end
def format(tag, time, record)
- record.to_msgpack
+ values = @key_names.map { |k|
+ k == '${time}' ? Time.at(time).strftime('%Y-%m-%d %H:%M:%S') : record[k]
+ }
+ values.to_msgpack
end
def write(chunk)
- tmp = Tempfile.new("loaddata")
- keys = nil
+ data_count = 0
+ tmp = Tempfile.new("mysql-loaddata")
chunk.msgpack_each { |record|
- # keyの取得は初回のみ
- if keys.nil?
- # columnsが指定されている場合はそっちを有効にする
- keys = @columns.nil? ? record.keys : @columns
- end
-
- values = []
- keys.each{ |key|
- values << record[key]
- }
-
- tmp.write values.join("\t") + "\n"
+ tmp.write record.join("\t") + "\n"
+ data_count += 1
}
tmp.close
- query = QUERY_TEMPLATE % ([tmp.path, @tablename, keys.join(",")])
-
conn = get_connection
- conn.query(query)
+ conn.query(QUERY_TEMPLATE % ([tmp.path, @tablename, @column_names]))
conn.close
+
+ log.info "number that is registered in the \"%s:%s\" table is %d" % ([@database, @tablename, data_count])
end
private
+
def get_connection
Mysql2::Client.new({
:host => @host,
:port => @port,
:username => @username,
:password => @password,
:database => @database,
- :encoding => @encoding
+ :encoding => @encoding,
+ :local_infile => true
})
end
end
end