lib/fluent/plugin/in_mysql_query.rb in fluent-plugin-mysql-query-0.3.0 vs lib/fluent/plugin/in_mysql_query.rb in fluent-plugin-mysql-query-1.0.0
- old
+ new
@@ -1,99 +1,95 @@
-module Fluent
- class MysqlQueryInput < Fluent::Input
- Plugin.register_input('mysql_query', self)
+require 'fluent/plugin/input'
+require 'mysql2'
- def initialize
- require 'mysql2'
- super
- end
+module Fluent::Plugin
+ class MysqlQueryInput < Fluent::Plugin::Input
+ Fluent::Plugin.register_input('mysql_query', self)
- config_param :host, :string, :default => 'localhost'
- config_param :port, :integer, :default => 3306
- config_param :username, :string, :default => 'root'
- config_param :password, :string, :default => nil
- config_param :database, :string, :default => nil
- config_param :encoding, :string, :default => 'utf8'
- config_param :interval, :time, :default => '1m'
+ helpers :timer
+
+ config_param :host, :string, default: 'localhost'
+ config_param :port, :integer, default: 3306
+ config_param :username, :string, default: 'root'
+ config_param :password, :string, default: nil, secret: true
+ config_param :database, :string, default: nil
+ config_param :encoding, :string, default: 'utf8'
+ config_param :interval, :time, default: '1m'
config_param :tag, :string
config_param :query, :string
- config_param :nest_result, :bool, :default => false
- config_param :nest_key, :string, :default => 'result'
- config_param :row_count, :bool, :default => false
- config_param :row_count_key, :string, :default => 'row_count'
- config_param :record_hostname, :bool, :default => false
+ config_param :nest_result, :bool, default: false
+ config_param :nest_key, :string, default: 'result'
+ config_param :row_count, :bool, default: false
+ config_param :row_count_key, :string, default: 'row_count'
+ config_param :record_hostname, :bool, default: false
def configure(conf)
super
@hostname = nil
$log.info "adding mysql_query job: [#{@query}] interval: #{@interval}sec"
end
def start
- @thread = Thread.new(&method(:run))
+ super
+ timer_execute(:in_mysql_query, @interval, &method(:on_timer))
end
- def shutdown
- Thread.kill(@thread)
- end
-
- def run
- loop do
- @hostname = get_mysql_hostname if @hostname.nil?
- tag = "#{@tag}".gsub('__HOSTNAME__', @hostname).gsub('${hostname}', @hostname)
- record = Hash.new
- record.store('hostname', @hostname) if @record_hostname
- result = get_exec_result
- record.store(@row_count_key, result.size) if @row_count
- if (@nest_result)
- record.store(@nest_key, result)
- Engine.emit(tag, Engine.now, record)
- else
- result.each do |data|
- Engine.emit(tag, Engine.now, record.merge(data))
- end
+ def on_timer
+ @hostname = get_mysql_hostname if @hostname.nil?
+ tag = "#{@tag}".gsub('__HOSTNAME__', @hostname).gsub('${hostname}', @hostname)
+ record = {}
+ record.store('hostname', @hostname) if @record_hostname
+ result = get_exec_result
+ record.store(@row_count_key, result.size) if @row_count
+ if (@nest_result)
+ record.store(@nest_key, result)
+ router.emit(tag, Fluent::Engine.now, record)
+ else
+ result.each do |data|
+ router.emit(tag, Fluent::Engine.now, record.merge(data))
end
- sleep @interval
end
end
def get_connection
begin
return Mysql2::Client.new({
- :host => @host,
- :port => @port,
- :username => @username,
- :password => @password,
- :database => @database,
- :encoding => @encoding,
- :reconnect => true
+ host: @host,
+ port: @port,
+ username: @username,
+ password: @password,
+ database: @database,
+ encoding: @encoding,
+ reconnect: true
})
rescue Exception => e
- $log.warn "mysql_query: #{e}"
+ log.warn "mysql_query: #{e}"
sleep @interval
retry
end
end
def query(query)
@mysql ||= get_connection
begin
- return @mysql.query(query, :cast => false, :cache_rows => false)
+ return @mysql.query(query, cast: false, cache_rows: false)
rescue Exception => e
- $log.warn "mysql_query: #{e}"
+ log.warn "mysql_query: #{e}"
sleep @interval
retry
end
end
def get_mysql_hostname
query("SHOW VARIABLES LIKE 'hostname'").each do |row|
return row.fetch('Value')
end
+ # hostname variable is not present
+ return ''
end
def get_exec_result
- result = Array.new
+ result = []
stmt = query(@query)
stmt.each do |row|
result.push(row)
end
return result