# # Fluent # # Copyright (C) 2013 FURUHASHI Sadayuki # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require "fluent/plugin/input" module Fluent::Plugin require 'active_record' class SQLInput < Input Fluent::Plugin.register_input('sql', self) desc 'RDBMS host' config_param :host, :string desc 'RDBMS port' config_param :port, :integer, default: nil desc 'RDBMS driver name.' config_param :adapter, :string desc 'RDBMS database name' config_param :database, :string desc 'RDBMS login user name' config_param :username, :string, default: nil desc 'RDBMS login password' config_param :password, :string, default: nil, secret: true desc 'RDBMS socket path' config_param :socket, :string, default: nil desc 'PostgreSQL schema search path' config_param :schema_search_path, :string, default: nil desc 'path to a file to store last rows' config_param :state_file, :string, default: nil desc 'prefix of tags of events. actual tag will be this_tag_prefix.tables_tag (optional)' config_param :tag_prefix, :string, default: nil desc 'interval to run SQLs (optional)' config_param :select_interval, :time, default: 60 desc 'limit of number of rows for each SQL(optional)' config_param :select_limit, :time, default: 500 class TableElement include Fluent::Configurable config_param :table, :string config_param :tag, :string, default: nil config_param :update_column, :string, default: nil config_param :time_column, :string, default: nil config_param :primary_key, :string, default: nil config_param :time_format, :string, default: '%Y-%m-%d %H:%M:%S.%6N%z' attr_reader :log def configure(conf) super end def init(tag_prefix, base_model, router, log) @router = router @tag = "#{tag_prefix}.#{@tag}" if tag_prefix @log = log # creates a model for this table table_name = @table primary_key = @primary_key time_format = @time_format @model = Class.new(base_model) do self.table_name = table_name self.inheritance_column = '_never_use_' self.primary_key = primary_key if primary_key self.const_set(:TIME_FORMAT, time_format) #self.include_root_in_json = false def read_attribute_for_serialization(n) v = send(n) if v.respond_to?(:to_msgpack) v elsif v.is_a? Time v.strftime(self.class::TIME_FORMAT) else v.to_s end end end # ActiveRecord requires model class to have a name. class_name = table_name.gsub(/\./, "_").singularize.camelize base_model.const_set(class_name, @model) # Sets model_name otherwise ActiveRecord causes errors model_name = ActiveModel::Name.new(@model, nil, class_name) @model.define_singleton_method(:model_name) { model_name } # if update_column is not set, here uses primary key unless @update_column pk = @model.columns_hash[@model.primary_key] unless pk raise "Composite primary key is not supported. Set update_column parameter to section." end @update_column = pk.name end end # Make sure we always have a Fluent::EventTime object regardless of what comes in def normalized_time(tv, now) return Fluent::EventTime.from_time(tv) if tv.is_a?(Time) begin Fluent::EventTime.parse(tv.to_s) rescue log.warn "Message contains invalid timestamp, using current time instead (#{now.inspect})" now end end # emits next records and returns the last record of emitted records def emit_next_records(last_record, limit) relation = @model if last_record && last_update_value = last_record[@update_column] relation = relation.where("#{@update_column} > ?", last_update_value) end relation = relation.order("#{@update_column} ASC") relation = relation.limit(limit) if limit > 0 now = Fluent::Engine.now me = Fluent::MultiEventStream.new relation.each do |obj| record = obj.serializable_hash rescue nil if record time = if @time_column && (tv = obj.read_attribute(@time_column)) normalized_time(tv, now) else now end me.add(time, record) last_record = record end end last_record = last_record.dup if last_record # some plugin rewrites record :( @router.emit_stream(@tag, me) return last_record end end def configure(conf) super unless @state_file $log.warn "'state_file PATH' parameter is not set to a 'sql' source." $log.warn "this parameter is highly recommended to save the last rows to resume tailing." end @tables = conf.elements.select {|e| e.name == 'table' }.map {|e| te = TableElement.new te.configure(e) te } if config['all_tables'] @all_tables = true end end SKIP_TABLE_REGEXP = /\Aschema_migrations\Z/i def start @state_store = @state_file.nil? ? MemoryStateStore.new : StateStore.new(@state_file) config = { adapter: @adapter, host: @host, port: @port, database: @database, username: @username, password: @password, socket: @socket, schema_search_path: @schema_search_path, } # creates subclass of ActiveRecord::Base so that it can have different # database configuration from ActiveRecord::Base. @base_model = Class.new(ActiveRecord::Base) do # base model doesn't have corresponding phisical table self.abstract_class = true end # ActiveRecord requires the base_model to have a name. Here sets name # of an anonymous class by assigning it to a constant. In Ruby, class has # a name of a constant assigned first SQLInput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) # Now base_model can have independent configuration from ActiveRecord::Base @base_model.establish_connection(config) if @all_tables # get list of tables from the database @tables = @base_model.connection.tables.map do |table_name| if table_name.match(SKIP_TABLE_REGEXP) # some tables such as "schema_migrations" should be ignored nil else te = TableElement.new te.configure({ 'table' => table_name, 'tag' => table_name, 'update_column' => nil, }) te end end.compact end # ignore tables if TableElement#init failed @tables.reject! do |te| begin te.init(@tag_prefix, @base_model, router, log) log.info "Selecting '#{te.table}' table" false rescue => e log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e log.warn_backtrace e.backtrace true end end @stop_flag = false @thread = Thread.new(&method(:thread_main)) end def shutdown @stop_flag = true $log.debug "Waiting for thread to finish" @thread.join end def thread_main until @stop_flag sleep @select_interval begin conn = @base_model.connection conn.active? || conn.reconnect! rescue => e log.warn "can't connect to database. Reconnect at next try" next end @tables.each do |t| begin last_record = @state_store.last_records[t.table] @state_store.last_records[t.table] = t.emit_next_records(last_record, @select_limit) @state_store.update! rescue => e log.error "unexpected error", error: e log.error_backtrace e.backtrace end end end end class StateStore def initialize(path) require 'yaml' @path = path if File.exists?(@path) @data = YAML.load_file(@path) if @data == false || @data == [] # this happens if an users created an empty file accidentally @data = {} elsif !@data.is_a?(Hash) raise "state_file on #{@path.inspect} is invalid" end else @data = {} end end def last_records @data['last_records'] ||= {} end def update! File.open(@path, 'w') {|f| f.write YAML.dump(@data) } end end class MemoryStateStore def initialize @data = {} end def last_records @data['last_records'] ||= {} end def update! end end end end