require 'fluent/plugin/input' require 'fluent/config/error' module Fluent::Plugin class SQLQueryInput < Input Fluent::Plugin.register_input('sqlquery_ssh', self) helpers :thread, :storage, :compat_parameters unless method_defined?(:log) define_method(:log) { $log } end def initialize super require 'mysql2' require 'net/ssh/gateway' require 'bigdecimal' end config_param :host, :string, :default => '127.0.0.1' config_param :shard_db_name, :string, :default => 'customer' config_param :skip_shards, :array, :default => nil config_param :shard_map_query, :string, :default => nil config_param :ssh_key, :integer, :default => 'id_rsa' config_param :ssh_local_port, :integer, :default => 33009 config_param :ssh_gateway, :string, :default => nil config_param :ssh_username, :string, :default => nil, :secret => true config_param :ssh_passphrase, :string, :default => nil, :secret => true config_param :db_username, :string, :default => 'root', :secret => true config_param :db_password, :string, :default => 'atqM2rQZxp9GMY', :secret => true #config_param :database, :string, :default => nil #config_param :databases, :array, :default => nil #databases if passed in. config_param :db_port, :integer, :default => 33006 config_param :encoding, :string, :default => 'utf8' config_param :interval, :time, :default => '1d' config_param :tag, :string config_param :cast, :hash, :default => nil #cast a hash of key (column) to cast as (type) config_param :query, :string config_param :nest_result, :bool, :default => false config_param :nest_key, :string, :default => 'data' config_param :row_count, :bool, :default => true config_param :row_count_key, :string, :default => 'row_count' config_param :record_hostname, :bool, :default => true config_param :connect_timeout, :integer, :default => '30' config_param :read_timeout, :integer, :default => '600' config_param :init_sql, :string, :default => nil def configure(conf) super end def start super #thread_create(:, &method(:run)) @thread = Thread.new(&method(:run)) @ssh_port = nil end def shutdown @stop = "ending run" @ssh_gate.close(@ssh_local_port) @mysql = nil if @thread @thread.join @thread = nil end super end def run begin $log.info "Begin run." loop do break if @stop @shard_map ||= get_shards #loop thru shards begin @shard_map.each do |shard| break if @stop shard.each do |remotehost,db_names| break if @stop @mysql = nil #close connection @ssh_gate.close(@ssh_local_port) hostname = remotehost.gsub(/mylocaldb([0-9]*)/, "customers\\1-reporting.db.prd.us-east-silo.ls") @host = hostname $log.info "start shard: #{hostname}\n" #handle the dbs process_shard_dbs(remotehost, db_names) $log.info "finished shard: #{hostname}\n" end end @mysql = nil #close connection @ssh_gate.close(@ssh_local_port) #process_shards if @shard_map $log.info "completed run all shards. sleeping for #{@interval}\n" #end don't loop rescue Exception => e $log.error "error on shard: #{@current_shard}\n" $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n" end sleep @interval end rescue # ignore end end #get the shards. put the info somewhere? def get_shards begin @conn ||= get_connection @conn.select_db(@shard_db_name) #switch the customerdb $log.info "Getting Shard Mapping: [#{@shard_map_query}]" @conn.query("SET SESSION group_concat_max_len= 1844674407370") shard_mapping = Array.new shardlist = @conn.query(@shard_map_query, :cast => false, :cache_rows => false) shardlist.each do |row| customer_dbs = row['database_name'].split(',') cust_host = row['database_host'] shard = Hash.new(cust_host) shard[cust_host] = customer_dbs shard_mapping.push(shard) $log.info "adding #{customer_dbs}\n" end @conn.close return shard_mapping rescue Exception => e $log.error "Can't get shard info\n" $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n" exit! end end def process_shard_dbs(remotehost, db_names) db_names.each do |db| $log.info "customer: #{db} shard: #{remotehost}" return if @stop @mysql ||= get_connection #open conn to shard. @current_shard = remotehost @mysql.select_db(db) #switch the customerdb #$log.info "check customer: #{db}\n" tag = "#{@tag}_#{db}_#{remotehost}" record = Hash.new result = get_exec_result #record.store(@row_count_key, result.size) if @row_count result.each_with_index do |data, index| router.emit(tag, Fluent::Engine.now, record.merge(data)) end $log.info "completed #{db} rows: #{result.size}\n" end end #get the query results def get_exec_result result = Array.new stmt = query(@query) stmt.each do |row| #to be replaced by the cast array row['avg'] = row['avg'].to_f.round(2) if row['avg'] row['daily_total'] = row['daily_total'].to_f if row['daily_total'] row['total_transactions'] = row['total_transactions'].to_i if row['total_transactions'] row['cust_of_customer'] = row['cust_of_customer'].to_i if row['cust_of_customer'] row['shop_count'] = row['shop_count'].to_i if row['shop_count'] row['items'] = row['items'].to_i if row['items'] row['register_count'] = row['register_count'].to_i if row['register_count'] row['customer_shard'] = @current_shard if @current_shard result.push(row) end return result end def query(query) begin return if @mysql.nil? return @mysql.query(query, :cast => false, :cache_rows => false) rescue Exception => e $log.error "fluent-plugin-sqlquery-ssh: Query ERROR!\n" $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n" end end # Returns +true+ if the column is either of type string or text. def text? type == :string || type == :text end # Returns +true+ if the column is either of type integer, float or decimal. def number? type == :integer || type == :float || type == :decimal end # convert something to a boolean def value_to_boolean(value) if value.is_a?(String) && value.empty? nil else TRUE_VALUES.include?(value) end end # Used to convert values to integer. # handle the case when an integer column is used to store boolean values def value_to_integer(value) case value when TrueClass, FalseClass value ? 1 : 0 else value.to_i rescue nil end end # convert something to a BigDecimal def value_to_decimal(value) # Using .class is faster than .is_a? and # subclasses of BigDecimal will be handled # in the else clause if value.class == BigDecimal value elsif value.respond_to?(:to_d) value.to_d else value.to_s.to_d end end def get_connection begin $log.info "Opening ssh tunnel to #{@ssh_gateway}\n" @ssh_gate ||= Net::SSH::Gateway.new(@ssh_gateway, @ssh_username, :verbose => :debug ,:keys => [@ssh_key], :forward_agent => true) @ssh_port = @ssh_gate.open(@host, 3306, @ssh_local_port) || @db_port $log.info "connecting to #{@host}\n" return Mysql2::Client.new({ :host => '127.0.0.1', :port => @ssh_port, :username => @db_username, :password => @db_password, :encoding => @encoding, :reconnect => true, :read_timeout => 600, :connect_timeout => 30 }) rescue Exception => e $log.error "fluent-plugin-sqlquery-ssh: Main Connect ERROR!\n" $log.error "MSG: #{e.message}\n TRACE:#{e.backtrace.join("\n")} PORT: #{@ssh_port}\n" #sleep @interval #retry end return nil end def get_mysql_hostname query("SHOW VARIABLES LIKE 'hostname'").each do |row| return row.fetch('Value') end end end end