lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-0.2.1 vs lib/fluent/plugin/in_sql.rb in fluent-plugin-sql-0.2.2

- old
+ new

@@ -52,30 +52,47 @@ end def init(tag_prefix, base_model) @tag = "#{tag_prefix}.#{@tag}" if tag_prefix + # creates a model for this table table_name = @table @model = Class.new(base_model) do self.table_name = table_name self.inheritance_column = '_never_use_' + #self.include_root_in_json = false + + def read_attribute_for_serialization(n) + v = send(n) + if v.respond_to?(:to_msgpack) + v + else + v.to_s + end + end end + + # ActiveRecord requires model class to have a name. class_name = table_name.singularize.camelize base_model.const_set(class_name, @model) + + # Sets model_name otherwise ActiveRecord causes errors model_name = ActiveModel::Name.new(@model, nil, class_name) @model.define_singleton_method(:model_name) { model_name } + # if update_column is not set, here uses primary key unless @update_column columns = Hash[@model.columns.map {|c| [c.name, c] }] pk = columns[@model.primary_key] unless pk raise "Composite primary key is not supported. Set update_column parameter to <table> section." end @update_column = pk.name end end + # emits next records and returns the last record of emitted records def emit_next_records(last_record, limit) relation = @model if last_record && last_update_value = last_record[@update_column] relation = relation.where("#{@update_column} > ?", last_update_value) end @@ -84,22 +101,27 @@ now = Engine.now entry_name = @model.table_name.singularize me = MultiEventStream.new relation.each do |obj| - record = obj.as_json[entry_name] rescue nil + record = obj.serializable_hash rescue nil if record - if tv = record[@time_column] - time = Time.parse(tv.to_s) rescue now + if @time_column && tv = obj.read_attribute(@time_column) + if tv.is_a?(Time) + time = tv.to_i + else + time = Time.parse(tv.to_s).to_i rescue now + end else time = now end me.add(time, record) last_record = record end end + last_record = last_record.dup # some plugin rewrites record :( Engine.emit_stream(@tag, me) return last_record end end @@ -132,19 +154,30 @@ :database => @database, :username => @username, :password => @password, } + # creates subclass of ActiveRecord::Base so that it can have different + # database configuration from ActiveRecord::Base. @base_model = Class.new(ActiveRecord::Base) do + # base model doesn't have corresponding phisical table self.abstract_class = true end + + # ActiveRecord requires the base_model to have a name. Here sets name + # of an anonymous class by assigning it to a constant. In Ruby, class has + # a name of a constant assigned first SQLInput.const_set("BaseModel_#{rand(1<<31)}", @base_model) + + # Now base_model can have independent configuration from ActiveRecord::Base @base_model.establish_connection(config) if @all_tables + # get list of tables from the database @tables = @base_model.connection.tables.map do |table_name| if table_name.match(SKIP_TABLE_REGEXP) + # some tables such as "schema_migrations" should be ignored nil else te = TableElement.new te.configure({ 'table' => table_name, @@ -154,10 +187,11 @@ te end end.compact end + # ignore tables if TableElement#init failed @tables.reject! do |te| begin te.init(@tag_prefix, @base_model) $log.info "Selecting '#{te.table}' table" false @@ -196,9 +230,15 @@ class StateStore def initialize(path) @path = path if File.exists?(@path) @data = YAML.load_file(@path) + if @data == false || @data == [] + # this happens if an users created an empty file accidentally + @data = {} + elsif !@data.is_a?(Hash) + raise "state_file on #{@path.inspect} is invalid" + end else @data = {} end end