lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-0.2.1 vs lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-0.2.2
- old
+ new
@@ -52,30 +52,47 @@
end
def init(tag_prefix, base_model)
@tag = "#{tag_prefix}.#{@tag}" if tag_prefix
+ # creates a model for this table
table_name = @table
@model = Class.new(base_model) do
self.table_name = table_name
self.inheritance_column = '_never_use_'
+ #self.include_root_in_json = false
+
+ def read_attribute_for_serialization(n)
+ v = send(n)
+ if v.respond_to?(:to_msgpack)
+ v
+ else
+ v.to_s
+ end
+ end
end
+
+ # ActiveRecord requires model class to have a name.
class_name = table_name.singularize.camelize
base_model.const_set(class_name, @model)
+
+ # Sets model_name otherwise ActiveRecord causes errors
model_name = ActiveModel::Name.new(@model, nil, class_name)
@model.define_singleton_method(:model_name) { model_name }
+ # if update_column is not set, here uses primary key
unless @update_column
columns = Hash[@model.columns.map {|c| [c.name, c] }]
pk = columns[@model.primary_key]
unless pk
raise "Composite primary key is not supported. Set update_column parameter to <table> section."
end
@update_column = pk.name
end
end
+ # emits next records and returns the last record of emitted records
def emit_next_records(last_record, limit)
relation = @model
if last_record && last_update_value = last_record[@update_column]
relation = relation.where("#{@update_column} > ?", last_update_value)
end
@@ -84,22 +101,27 @@
now = Engine.now
entry_name = @model.table_name.singularize
me = MultiEventStream.new
relation.each do |obj|
- record = obj.as_json[entry_name] rescue nil
+ record = obj.serializable_hash rescue nil
if record
- if tv = record[@time_column]
- time = Time.parse(tv.to_s) rescue now
+ if @time_column && tv = obj.read_attribute(@time_column)
+ if tv.is_a?(Time)
+ time = tv.to_i
+ else
+ time = Time.parse(tv.to_s).to_i rescue now
+ end
else
time = now
end
me.add(time, record)
last_record = record
end
end
+ last_record = last_record.dup # some plugin rewrites record :(
Engine.emit_stream(@tag, me)
return last_record
end
end
@@ -132,19 +154,30 @@
:database => @database,
:username => @username,
:password => @password,
}
+ # creates subclass of ActiveRecord::Base so that it can have different
+ # database configuration from ActiveRecord::Base.
@base_model = Class.new(ActiveRecord::Base) do
+ # base model doesn't have corresponding phisical table
self.abstract_class = true
end
+
+ # ActiveRecord requires the base_model to have a name. Here sets name
+ # of an anonymous class by assigning it to a constant. In Ruby, class has
+ # a name of a constant assigned first
SQLInput.const_set("BaseModel_#{rand(1<<31)}", @base_model)
+
+ # Now base_model can have independent configuration from ActiveRecord::Base
@base_model.establish_connection(config)
if @all_tables
+ # get list of tables from the database
@tables = @base_model.connection.tables.map do |table_name|
if table_name.match(SKIP_TABLE_REGEXP)
+ # some tables such as "schema_migrations" should be ignored
nil
else
te = TableElement.new
te.configure({
'table' => table_name,
@@ -154,10 +187,11 @@
te
end
end.compact
end
+ # ignore tables if TableElement#init failed
@tables.reject! do |te|
begin
te.init(@tag_prefix, @base_model)
$log.info "Selecting '#{te.table}' table"
false
@@ -196,9 +230,15 @@
class StateStore
def initialize(path)
@path = path
if File.exists?(@path)
@data = YAML.load_file(@path)
+ if @data == false || @data == []
+ # this happens if an users created an empty file accidentally
+ @data = {}
+ elsif !@data.is_a?(Hash)
+ raise "state_file on #{@path.inspect} is invalid"
+ end
else
@data = {}
end
end