lib/fluent/plugin/in_mysql_appender.rb in fluent-plugin-mysql-appender-0.3.2 vs lib/fluent/plugin/in_mysql_appender.rb in fluent-plugin-mysql-appender-0.3.3

- old
+ new

@@ -24,10 +24,11 @@ config_param :query, :string config_param :time_column, :string, :default => nil config_param :primary_key, :string, :default => 'id' config_param :interval, :string, :default => '1m' config_param :tag, :string, :default => nil + config_param :buffer, :integer, :defalut => 0 def configure(conf) super @interval = Config.time_value(@interval) @@ -61,25 +62,30 @@ loop do rows_count = 0 start_time = Time.now select_query = @query.gsub(/"/,'') + " where #{primary_key} > #{last_id} order by #{primary_key} asc limit #{limit}" rows, con = query(select_query, con) - rows.each_with_index do |row, index| - tag = format_tag(@tag, {:event => :insert}) - if @time_column.nil? then - td_time = Engine.now - else - if row[@time_column].kind_of?(Time) then - td_time = row[@time_column].to_i - else - td_time = Time.parse(row[@time_column].to_s).to_i - end + if @buffer < rows.size then + if index == (rows.size - @buffer) then + break end - row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)} - router.emit(tag, td_time, row) - rows_count += 1 - if index == rows.size - 1 - @last_id = row[@primary_key] + rows.each_with_index do |row, index| + tag = format_tag(@tag, {:event => :insert}) + if @time_column.nil? then + td_time = Engine.now + else + if row[@time_column].kind_of?(Time) then + td_time = row[@time_column].to_i + else + td_time = Time.parse(row[@time_column].to_s).to_i + end + end + row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)} + router.emit(tag, td_time, row) + rows_count += 1 + if index == rows.size - @buffer - 1 + @last_id = row[@primary_key] + end end end con.close elapsed_time = sprintf("%0.02f", Time.now - start_time) $log.info "mysql_appender: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :last_id=>#{last_id} :elapsed_time=>#{elapsed_time} sec"