lib/fluent/plugin/in_mysql_appender.rb in fluent-plugin-mysql-appender-0.3.2 vs lib/fluent/plugin/in_mysql_appender.rb in fluent-plugin-mysql-appender-0.3.3
- old
+ new
@@ -24,10 +24,11 @@
config_param :query, :string
config_param :time_column, :string, :default => nil
config_param :primary_key, :string, :default => 'id'
config_param :interval, :string, :default => '1m'
config_param :tag, :string, :default => nil
+ config_param :buffer, :integer, :defalut => 0
def configure(conf)
super
@interval = Config.time_value(@interval)
@@ -61,25 +62,30 @@
loop do
rows_count = 0
start_time = Time.now
select_query = @query.gsub(/"/,'') + " where #{primary_key} > #{last_id} order by #{primary_key} asc limit #{limit}"
rows, con = query(select_query, con)
- rows.each_with_index do |row, index|
- tag = format_tag(@tag, {:event => :insert})
- if @time_column.nil? then
- td_time = Engine.now
- else
- if row[@time_column].kind_of?(Time) then
- td_time = row[@time_column].to_i
- else
- td_time = Time.parse(row[@time_column].to_s).to_i
- end
+ if @buffer < rows.size then
+ if index == (rows.size - @buffer) then
+ break
end
- row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
- router.emit(tag, td_time, row)
- rows_count += 1
- if index == rows.size - 1
- @last_id = row[@primary_key]
+ rows.each_with_index do |row, index|
+ tag = format_tag(@tag, {:event => :insert})
+ if @time_column.nil? then
+ td_time = Engine.now
+ else
+ if row[@time_column].kind_of?(Time) then
+ td_time = row[@time_column].to_i
+ else
+ td_time = Time.parse(row[@time_column].to_s).to_i
+ end
+ end
+ row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
+ router.emit(tag, td_time, row)
+ rows_count += 1
+ if index == rows.size - @buffer - 1
+ @last_id = row[@primary_key]
+ end
end
end
con.close
elapsed_time = sprintf("%0.02f", Time.now - start_time)
$log.info "mysql_appender: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :last_id=>#{last_id} :elapsed_time=>#{elapsed_time} sec"