lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-2.0.0 vs lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-2.1.0

- old
+ new

@@ -58,17 +58,20 @@ config_param :tag, :string, default: nil config_param :update_column, :string, default: nil config_param :time_column, :string, default: nil config_param :primary_key, :string, default: nil + attr_reader :log + def configure(conf) super end - def init(tag_prefix, base_model, router) + def init(tag_prefix, base_model, router, log) @router = router @tag = "#{tag_prefix}.#{@tag}" if tag_prefix + @log = log # creates a model for this table table_name = @table primary_key = @primary_key @model = Class.new(base_model) do @@ -106,10 +109,21 @@ end @update_column = pk.name end end + # Make sure we always have a Fluent::EventTime object regardless of what comes in + def normalized_time(tv, now) + return Fluent::EventTime.from_time(tv) if tv.is_a?(Time) + begin + Fluent::EventTime.parse(tv.to_s) + rescue + log.warn "Message contains invalid timestamp, using current time instead (#{now.inspect})" + now + end + end + # emits next records and returns the last record of emitted records def emit_next_records(last_record, limit) relation = @model if last_record && last_update_value = last_record[@update_column] relation = relation.where("#{@update_column} > ?", last_update_value) @@ -121,19 +135,17 @@ me = Fluent::MultiEventStream.new relation.each do |obj| record = obj.serializable_hash rescue nil if record - if @time_column && tv = obj.read_attribute(@time_column) - if tv.is_a?(Time) - time = tv.to_i + time = + if @time_column && (tv = obj.read_attribute(@time_column)) + normalized_time(tv, now) else - time = Time.parse(tv.to_s).to_i rescue now + now end - else - time = now - end + me.add(time, record) last_record = record end end @@ -215,10 +227,10 @@ end # ignore tables if TableElement#init failed @tables.reject! do |te| begin - te.init(@tag_prefix, @base_model, router) + te.init(@tag_prefix, @base_model, router, log) log.info "Selecting '#{te.table}' table" false rescue => e log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e log.warn_backtrace e.backtrace