lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-2.0.0 vs lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-2.1.0
- old
+ new
@@ -58,17 +58,20 @@
config_param :tag, :string, default: nil
config_param :update_column, :string, default: nil
config_param :time_column, :string, default: nil
config_param :primary_key, :string, default: nil
+ attr_reader :log
+
def configure(conf)
super
end
- def init(tag_prefix, base_model, router)
+ def init(tag_prefix, base_model, router, log)
@router = router
@tag = "#{tag_prefix}.#{@tag}" if tag_prefix
+ @log = log
# creates a model for this table
table_name = @table
primary_key = @primary_key
@model = Class.new(base_model) do
@@ -106,10 +109,21 @@
end
@update_column = pk.name
end
end
+ # Make sure we always have a Fluent::EventTime object regardless of what comes in
+ def normalized_time(tv, now)
+ return Fluent::EventTime.from_time(tv) if tv.is_a?(Time)
+ begin
+ Fluent::EventTime.parse(tv.to_s)
+ rescue
+ log.warn "Message contains invalid timestamp, using current time instead (#{now.inspect})"
+ now
+ end
+ end
+
# emits next records and returns the last record of emitted records
def emit_next_records(last_record, limit)
relation = @model
if last_record && last_update_value = last_record[@update_column]
relation = relation.where("#{@update_column} > ?", last_update_value)
@@ -121,19 +135,17 @@
me = Fluent::MultiEventStream.new
relation.each do |obj|
record = obj.serializable_hash rescue nil
if record
- if @time_column && tv = obj.read_attribute(@time_column)
- if tv.is_a?(Time)
- time = tv.to_i
+ time =
+ if @time_column && (tv = obj.read_attribute(@time_column))
+ normalized_time(tv, now)
else
- time = Time.parse(tv.to_s).to_i rescue now
+ now
end
- else
- time = now
- end
+
me.add(time, record)
last_record = record
end
end
@@ -215,10 +227,10 @@
end
# ignore tables if TableElement#init failed
@tables.reject! do |te|
begin
- te.init(@tag_prefix, @base_model, router)
+ te.init(@tag_prefix, @base_model, router, log)
log.info "Selecting '#{te.table}' table"
false
rescue => e
log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e
log.warn_backtrace e.backtrace