lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-1.1.0 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-1.2.0

- old
+ new

@@ -2,18 +2,20 @@ require 'tempfile' require 'zlib' require 'stringio' require 'td-client' +require 'fluent/error' require 'fluent/plugin/output' require 'fluent/plugin/td_plugin_version' module Fluent::Plugin class TreasureDataLogOutput < Output Fluent::Plugin.register_output('tdlog', self) IMPORT_SIZE_LIMIT = 32 * 1024 * 1024 + IMPORT_RECORDS_LIMIT = 8096 UPLOAD_EXT = 'msgpack.gz'.freeze helpers :event_emitter, :compat_parameters config_param :apikey, :string, :secret => true @@ -34,17 +36,18 @@ config_section :buffer do config_set_default :@type, 'file' config_set_default :chunk_keys, ['tag'] config_set_default :flush_interval, 300 config_set_default :chunk_limit_size, IMPORT_SIZE_LIMIT + config_set_default :chunk_limit_records, IMPORT_RECORDS_LIMIT end def initialize super @key = nil - @key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys. - @record_size_limit = 32 * 1024 * 1024 # TODO + @key_num_limit = 512 + @record_size_limit = 32 * 1024 * 1024 @table_list = {} @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze end @@ -105,11 +108,10 @@ begin record['time'] = time.to_i record.delete(:time) if record.has_key?(:time) if record.size > @key_num_limit - # TODO include summary of the record router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)")) return nil end rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}")) @@ -166,11 +168,10 @@ upload(database, table, f, size, unique_id) ensure f.close(true) if f end - # TODO: Share this routine with s3 compressors def gzip_by_command(chunk, tmp) chunk_is_file = @buffer_config['@type'] == 'file' path = if chunk_is_file chunk.path else @@ -181,14 +182,12 @@ w.path end res = system "gzip -c #{path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" - begin - tmp.truncate(0) - return gzip_by_writer(chunk, tmp) - end + tmp.truncate(0) + return gzip_by_writer(chunk, tmp) end File.size(tmp.path) ensure unless chunk_is_file w.close(true) rescue nil @@ -209,22 +208,26 @@ def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } + start = Time.now begin begin - start = Time.now @client.import(database, table, UPLOAD_EXT, io, size, unique_str) - rescue TreasureData::NotFoundError => e + rescue TreasureData::NotFoundError unless @auto_create_table - raise e + raise end ensure_database_and_table(database, table) io.pos = 0 retry end + rescue TreasureData::TooManyRequestsError + raise + rescue TreasureData::ClientError => e + raise Fluent::UnrecoverableError.new(e.message) rescue => e elapsed = Time.now - start ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)") ne.set_backtrace(e.backtrace) raise ne @@ -267,9 +270,10 @@ @api_client.create_log_table(database, table) rescue TreasureData::NotFoundError @api_client.create_database(database) @api_client.create_log_table(database, table) rescue TreasureData::AlreadyExistsError + # ignored end end end end