lib/fluent/plugin/in_mysql_appender.rb in fluent-plugin-mysql-appender-0.1.3 vs lib/fluent/plugin/in_mysql_appender.rb in fluent-plugin-mysql-appender-0.1.4
- old
+ new
@@ -16,11 +16,14 @@
config_param :port, :integer, :default => 3306
config_param :username, :string, :default => 'root'
config_param :password, :string, :default => nil, :secret => true
config_param :database, :string, :default => nil
config_param :encoding, :string, :default => 'utf8'
+ config_param :last_id, :integer, :default => -1
+ config_param :limit, :integer, :default => 100
config_param :query, :string
+ config_param :time_column, :string, :default => nil
config_param :primary_key, :string, :default => 'id'
config_param :interval, :string, :default => '1m'
config_param :tag, :string, :default => nil
def configure(conf)
@@ -29,11 +32,11 @@
if @tag.nil?
raise Fluent::ConfigError, "mysql_appender: missing 'tag' parameter. Please add following line into config like 'tag replicator.mydatabase.mytable.${event}.${primary_key}'"
end
- $log.info "adding mysql_appender worker. :tag=>#{tag} :query=>#{@query} :prepared_query=>#{@prepared_query} :interval=>#{@interval}sec"
+ $log.info "adding mysql_appender worker. :tag=>#{tag} :query=>#{@query} :limit=>#{limit} :interval=>#{@interval} sec "
end
def start
@thread = Thread.new(&method(:run))
end
@@ -52,31 +55,31 @@
end
end
def poll
con = get_connection()
- max_id = -1
loop do
rows_count = 0
start_time = Time.now
- if max_id == -1
- select_query = @query.gsub(/"/,'') + " order by #{primary_key} asc"
- else
- select_query = @query.gsub(/"/,'') + " where #{primary_key} > #{max_id} order by #{primary_key} asc"
- end
+ select_query = @query.gsub(/"/,'') + " where #{primary_key} > #{last_id} order by #{primary_key} asc limit #{limit}"
rows, con = query(select_query, con)
rows.each_with_index do |row, index|
tag = format_tag(@tag, {:event => :insert})
+ if @time_column.nil? then
+ td_time = Engine.now
+ else
+ td_time = Time.parse(row[@time_column]).to_i
+ end
row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
- router.emit(tag, Engine.now, row)
+ router.emit(tag, td_time, row)
rows_count += 1
if index == rows.size - 1
- max_id = row[@primary_key]
+ @last_id = row[@primary_key]
end
end
con.close
elapsed_time = sprintf("%0.02f", Time.now - start_time)
- $log.info "mysql_appender: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :elapsed_time=>#{elapsed_time} sec"
+ $log.info "mysql_appender: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :last_id=>#{last_id} :elapsed_time=>#{elapsed_time} sec"
sleep @interval
end
end
def format_tag(tag, param)