lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.11.0.rc1 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-1.0.0.rc1

- old
+ new

@@ -1,62 +1,57 @@ +require 'fileutils' +require 'tempfile' +require 'zlib' +require 'stringio' require 'td-client' -require 'fluent/output' + +require 'fluent/plugin/output' require 'fluent/plugin/td_plugin_version' -module Fluent - class TreasureDataLogOutput < BufferedOutput - Plugin.register_output('tdlog', self) +module Fluent::Plugin + class TreasureDataLogOutput < Output + Fluent::Plugin.register_output('tdlog', self) IMPORT_SIZE_LIMIT = 32 * 1024 * 1024 + UPLOAD_EXT = 'msgpack.gz'.freeze - # To support log_level option since Fluentd v0.10.43 - unless method_defined?(:log) - define_method(:log) { $log } - end + helpers :event_emitter, :compat_parameters config_param :apikey, :string, :secret => true config_param :auto_create_table, :bool, :default => true + config_param :database, :string, :default => nil + config_param :table, :string, :default => nil config_param :use_gzip_command, :bool, :default => false config_param :endpoint, :string, :default => TreasureData::API::NEW_DEFAULT_ENDPOINT config_param :use_ssl, :bool, :default => true + config_param :tmpdir, :string, :default => nil + config_param :http_proxy, :string, :default => nil config_param :connect_timeout, :integer, :default => nil config_param :read_timeout, :integer, :default => nil config_param :send_timeout, :integer, :default => nil - config_set_default :flush_interval, 300 + config_section :buffer do + config_set_default :@type, 'file' + config_set_default :chunk_keys, ['tag'] + config_set_default :flush_interval, 300 + config_set_default :chunk_limit_size, IMPORT_SIZE_LIMIT + end + def initialize - require 'fileutils' - require 'tempfile' - require 'zlib' - require 'net/http' - require 'json' - require 'cgi' # CGI.escape - require 'time' # Time#rfc2822 - require 'digest/md5' - require 'stringio' super - @tmpdir = nil @key = nil @key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys. @record_size_limit = 32 * 1024 * 1024 # TODO @table_list = {} @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze end def configure(conf) - # overwrite default value of buffer_chunk_limit - unless conf.has_key?('buffer_chunk_limit') - conf['buffer_chunk_limit'] = IMPORT_SIZE_LIMIT - end + compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') - # v0.14 seems to have a bug of config_set_default: https://github.com/treasure-data/fluent-plugin-td/pull/22#issuecomment-230782005 - unless conf.has_key?('buffer_type') - conf['buffer_type'] = 'file' - end - super if @use_gzip_command require 'open3' @@ -65,23 +60,20 @@ rescue Errno::ENOENT raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter" end end - if conf.has_key?('tmpdir') - @tmpdir = conf['tmpdir'] - FileUtils.mkdir_p(@tmpdir) - end + FileUtils.mkdir_p(@tmpdir) if @tmpdir - database = conf['database'] - table = conf['table'] - if database && table - validate_database_and_table_name(database, table, conf) - @key = "#{database}.#{table}" + if @database && @table + validate_database_and_table_name(@database, @table) + @key = "#{@database}.#{@table}" + else + unless @chunk_key_tag + raise Fluent::ConfigError, "'tag' must be included in <buffer ARG> when database and table are not specified" + end end - - @http_proxy = conf['http_proxy'] end def start super @@ -91,76 +83,55 @@ } @client = TreasureData::Client.new(@apikey, client_opts) if @key if @auto_create_table - database, table = @key.split('.',2) - ensure_database_and_table(database, table) + ensure_database_and_table(@database, @table) else check_table_exists(@key) end end end - def emit(tag, es, chain) - if @key - key = @key - else - database, table = tag.split('.')[-2,2] - database = TreasureData::API.normalize_database_name(database) - table = TreasureData::API.normalize_table_name(table) - key = "#{database}.#{table}" - end + def multi_workers_ready? + true + end - unless @auto_create_table - check_table_exists(key) - end - - super(tag, es, chain, key) + def formatted_to_msgpack_binary + true end - def format_stream(tag, es) - out = MessagePack::Buffer.new - off = out.size # size is same as bytesize in ASCII-8BIT string - es.each { |time, record| - # Applications may send non-hash record or broken chunk may generate non-hash record so such records should be skipped - next unless record.is_a?(Hash) + def format(tag, time, record) + begin + record['time'] = time.to_i + record.delete(:time) if record.has_key?(:time) - begin - record['time'] = time.to_i - record.delete(:time) if record.has_key?(:time) - - if record.size > @key_num_limit - raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record - end - rescue => e - # TODO (a) Remove the transaction mechanism of fluentd - # or (b) keep transaction boundaries in in/out_forward. - # This code disables the transaction mechanism (a). - log.warn "Skipped a broken record (#{e}): #{summarize_record(record)}" - log.warn_backtrace e.backtrace - next + if record.size > @key_num_limit + # TODO include summary of the record + router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)")) + return nil end + rescue => e + router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}")) + return nil + end - begin - record.to_msgpack(out) - rescue RangeError - # In msgpack v0.5, 'out' becomes String, not Buffer. This is not a problem because Buffer has a compatibility with String - out = out.to_s[0, off] - TreasureData::API.normalized_msgpack(record, out) - end + begin + result = record.to_msgpack + rescue RangeError + result = TreasureData::API.normalized_msgpack(record) + rescue => e + router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("can't convert record to msgpack: #{e}")) + return nil + end - noff = out.size - sz = noff - off - if sz > @record_size_limit - # TODO don't raise error - #raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record - log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}" - end - off = noff - } - out.to_s + if result.bytesize > @record_size_limit + # Don't raise error. Large size is not critical for streaming import + log.warn "Size of a record too large (#{result.bytesize} bytes): #{summarize_record(record)}" + end + + result end def summarize_record(record) json = Yajl.dump(record) if json.size > 100 @@ -170,14 +141,21 @@ end end def write(chunk) unique_id = chunk.unique_id - database, table = chunk.key.split('.', 2) + if @key + database, table = @database, @table + else + database, table = chunk.metadata.tag.split('.')[-2, 2] + database = TreasureData::API.normalize_database_name(database) + table = TreasureData::API.normalize_table_name(table) + end + FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil? - f = Tempfile.new("tdlog-#{chunk.key}-", @tmpdir) + f = Tempfile.new("tdlog-#{chunk.metadata.tag}-", @tmpdir) f.binmode size = if @use_gzip_command gzip_by_command(chunk, f) else @@ -189,15 +167,15 @@ f.close(true) if f end # TODO: Share this routine with s3 compressors def gzip_by_command(chunk, tmp) - chunk_is_file = @buffer_type == 'file' + chunk_is_file = @buffer_config['@type'] == 'file' path = if chunk_is_file chunk.path else - w = Tempfile.new("gzip-tdlog-#{chunk.key}-", @tmpdir) + w = Tempfile.new("gzip-tdlog-#{chunk.metadata.tag}-", @tmpdir) w.binmode chunk.write_to(w) w.close w.path end @@ -233,11 +211,11 @@ log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } begin begin start = Time.now - @client.import(database, table, "msgpack.gz", io, size, unique_str) + @client.import(database, table, UPLOAD_EXT, io, size, unique_str) rescue TreasureData::NotFoundError => e unless @auto_create_table raise e end ensure_database_and_table(database, table) @@ -256,30 +234,30 @@ unless @table_list.has_key?(key) database, table = key.split('.', 2) log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" io = StringIO.new(@empty_gz_data) begin - @client.import(database, table, "msgpack.gz", io, io.size) + @client.import(database, table, UPLOAD_EXT, io, io.size) @table_list[key] = true rescue TreasureData::NotFoundError raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue => e log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect log.debug_backtrace e.backtrace end end end - def validate_database_and_table_name(database, table, conf) + def validate_database_and_table_name(database, table) begin TreasureData::API.validate_database_name(database) rescue => e - raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}" + raise ConfigError, "Invalid database name #{database.inspect}: #{e}" end begin TreasureData::API.validate_table_name(table) rescue => e - raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}" + raise ConfigError, "Invalid table name #{table.inspect}: #{e}" end end def ensure_database_and_table(database, table) log.info "Creating table #{database}.#{table} on TreasureData"