# frozen_string_literal: true require 'fluent/plugin/output' require 'oj' module Fluent::Plugin class MysqlBulkOutput < Output Fluent::Plugin.register_output('mysql_bulk', self) helpers :compat_parameters, :inject config_param :host, :string, default: '127.0.0.1', desc: "Database host." config_param :port, :integer, default: 3306, desc: "Database port." config_param :database, :string, desc: "Database name." config_param :username, :string, desc: "Database user." config_param :password, :string, default: '', secret: true, desc: "Database password." config_param :sslkey, :string, default: nil, desc: "SSL key." config_param :sslcert, :string, default: nil, desc: "SSL cert." config_param :sslca, :string, default: nil, desc: "SSL CA." config_param :sslcapath, :string, default: nil, desc: "SSL CA path." config_param :sslcipher, :string, default: nil, desc: "SSL cipher." config_param :sslverify, :bool, default: nil, desc: "SSL Verify Server Certificate." config_param :column_names, :string, desc: "Bulk insert column." config_param :key_names, :string, default: nil, desc: <<-DESC Value key names, ${time} is placeholder Time.at(time).strftime("%Y-%m-%d %H:%M:%S"). DESC config_param :json_key_names, :string, default: nil, desc: "Key names which store data as json" config_param :table, :string, desc: "Bulk insert table." config_param :unixtimestamp_key_names, :string, default: nil, desc: "Key names which store data as datetime from unix time stamp" config_param :on_duplicate_key_update, :bool, default: false, desc: "On duplicate key update enable." config_param :on_duplicate_update_keys, :string, default: nil, desc: "On duplicate key update column, comma separator." config_param :on_duplicate_update_custom_values, :string, default: nil, desc: "On_duplicate_update_custom_values, comma separator. specify the column name is insert value, custom value is use ${sql conditions}" config_param :insert_ignore, :bool, default: false, :desc => "Use INSERT IGNORE" config_param :max_rows_per_insert, :integer, default: 0, :desc => "Maximum number of rows to insert in each statement" config_param :transaction_isolation_level, :enum, list: [:read_uncommitted, :read_committed, :repeatable_read, :serializable], default: nil, desc: "Set transaction isolation level." attr_accessor :handler def initialize super require 'mysql2-cs-bind' end def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super if @column_names.nil? fail Fluent::ConfigError, 'column_names MUST specified, but missing' end if @on_duplicate_key_update if @on_duplicate_update_keys.nil? fail Fluent::ConfigError, 'on_duplicate_key_update = true , on_duplicate_update_keys nil!' end @on_duplicate_update_keys = @on_duplicate_update_keys.split(',') if !@on_duplicate_update_custom_values.nil? @on_duplicate_update_custom_values = @on_duplicate_update_custom_values.split(',') if @on_duplicate_update_custom_values.length != @on_duplicate_update_keys.length fail Fluent::ConfigError, <<-DESC on_duplicate_update_keys and on_duplicate_update_custom_values must be the same length DESC end end @on_duplicate_key_update_sql = ' ON DUPLICATE KEY UPDATE ' updates = [] @on_duplicate_update_keys.each_with_index do |update_column, i| if @on_duplicate_update_custom_values.nil? || @on_duplicate_update_custom_values[i] == "#{update_column}" updates << "#{update_column} = VALUES(#{update_column})" else value = @on_duplicate_update_custom_values[i].to_s.match(/\${(.*)}/)[1] escape_value = Mysql2::Client.escape(value) updates << "#{update_column} = #{escape_value}" end end @on_duplicate_key_update_sql += updates.join(',') end @column_names = @column_names.split(',').collect(&:strip) @key_names = @key_names.nil? ? @column_names : @key_names.split(',').collect(&:strip) @values_template = "(#{ @column_names.map { |key| '?' }.join(',') })" @insert_columns = @column_names.map{|x| "`#{x.to_s.gsub('`', '``')}`"}.join(',') @json_key_names = @json_key_names.split(',') if @json_key_names @unixtimestamp_key_names = @unixtimestamp_key_names.split(',') if @unixtimestamp_key_names end def check_table_schema(database: @database, table: @table) _client = client(database) result = _client.xquery("SHOW COLUMNS FROM #{table}") max_lengths = [] @column_names.each do |column| info = result.select { |x| x['Field'] == column }.first r = /(char|varchar)\(([\d]+)\)/ begin max_length = info['Type'].scan(r)[0][1].to_i rescue max_length = nil end max_lengths << max_length end max_lengths ensure if not _client.nil? then _client.close end end def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end def formatted_to_msgpack_binary true end def multi_workers_ready? true end def client(database) Mysql2::Client.new( host: @host, port: @port, username: @username, password: @password, database: database, sslkey: @sslkey, sslcert: @sslcert, sslca: @sslca, sslcapath: @sslcapath, sslcipher: @sslcipher, sslverify: @sslverify, flags: Mysql2::Client::MULTI_STATEMENTS ) end def expand_placeholders(metadata) database = extract_placeholders(@database, metadata).gsub('.', '_') table = extract_placeholders(@table, metadata).gsub('.', '_') return database, table end def write(chunk) database, table = expand_placeholders(chunk.metadata) max_lengths = check_table_schema(database: database, table: table) @handler = client(database) values = [] chunk.msgpack_each do |tag, time, data| data = format_proc.call(tag, time, data, max_lengths) values << Mysql2::Client.pseudo_bind(@values_template, data) end @handler.query("SET SESSION TRANSACTION ISOLATION LEVEL #{transaction_isolation_level}") if @transaction_isolation_level slice_size = @max_rows_per_insert > 0 ? @max_rows_per_insert : values.length values.each_slice(slice_size) do |slice| sql = "INSERT #{@insert_ignore ? "IGNORE" : ""} INTO #{table} (#{@insert_columns}) VALUES #{slice.join(',')}" sql += @on_duplicate_key_update_sql if @on_duplicate_key_update @handler.xquery(sql) end log.info "bulk insert values size (table: #{@table}) => #{values.size}" @handler.close end private def format_proc proc do |tag, time, record, max_lengths| values = [] @key_names.each_with_index do |key, i| if key == '${time}' value = Time.at(time).strftime('%Y-%m-%d %H:%M:%S') else if max_lengths[i].nil? || record[key].nil? value = record[key] else value = record[key].to_s.slice(0, max_lengths[i]) end if @json_key_names && @json_key_names.include?(key) value = Oj.dump(value) end if @unixtimestamp_key_names && @unixtimestamp_key_names.include?(key) value = Time.at(value).strftime('%Y-%m-%d %H:%M:%S') end end values << value end values end end def transaction_isolation_level case @transaction_isolation_level when :read_uncommitted "READ UNCOMMITTED" when :read_committed "READ COMMITTED" when :repeatable_read "REPEATABLE READ" when :serializable "SERIALIZABLE" end end end end