lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-1.1.0 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-1.2.0
- old
+ new
@@ -2,18 +2,20 @@
require 'tempfile'
require 'zlib'
require 'stringio'
require 'td-client'
+require 'fluent/error'
require 'fluent/plugin/output'
require 'fluent/plugin/td_plugin_version'
module Fluent::Plugin
class TreasureDataLogOutput < Output
Fluent::Plugin.register_output('tdlog', self)
IMPORT_SIZE_LIMIT = 32 * 1024 * 1024
+ IMPORT_RECORDS_LIMIT = 8096
UPLOAD_EXT = 'msgpack.gz'.freeze
helpers :event_emitter, :compat_parameters
config_param :apikey, :string, :secret => true
@@ -34,17 +36,18 @@
config_section :buffer do
config_set_default :@type, 'file'
config_set_default :chunk_keys, ['tag']
config_set_default :flush_interval, 300
config_set_default :chunk_limit_size, IMPORT_SIZE_LIMIT
+ config_set_default :chunk_limit_records, IMPORT_RECORDS_LIMIT
end
def initialize
super
@key = nil
- @key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys.
- @record_size_limit = 32 * 1024 * 1024 # TODO
+ @key_num_limit = 512
+ @record_size_limit = 32 * 1024 * 1024
@table_list = {}
@empty_gz_data = TreasureData::API.create_empty_gz_data
@user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze
end
@@ -105,11 +108,10 @@
begin
record['time'] = time.to_i
record.delete(:time) if record.has_key?(:time)
if record.size > @key_num_limit
- # TODO include summary of the record
router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)"))
return nil
end
rescue => e
router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}"))
@@ -166,11 +168,10 @@
upload(database, table, f, size, unique_id)
ensure
f.close(true) if f
end
- # TODO: Share this routine with s3 compressors
def gzip_by_command(chunk, tmp)
chunk_is_file = @buffer_config['@type'] == 'file'
path = if chunk_is_file
chunk.path
else
@@ -181,14 +182,12 @@
w.path
end
res = system "gzip -c #{path} > #{tmp.path}"
unless res
log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}"
- begin
- tmp.truncate(0)
- return gzip_by_writer(chunk, tmp)
- end
+ tmp.truncate(0)
+ return gzip_by_writer(chunk, tmp)
end
File.size(tmp.path)
ensure
unless chunk_is_file
w.close(true) rescue nil
@@ -209,22 +208,26 @@
def upload(database, table, io, size, unique_id)
unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join
log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
+ start = Time.now
begin
begin
- start = Time.now
@client.import(database, table, UPLOAD_EXT, io, size, unique_str)
- rescue TreasureData::NotFoundError => e
+ rescue TreasureData::NotFoundError
unless @auto_create_table
- raise e
+ raise
end
ensure_database_and_table(database, table)
io.pos = 0
retry
end
+ rescue TreasureData::TooManyRequestsError
+ raise
+ rescue TreasureData::ClientError => e
+ raise Fluent::UnrecoverableError.new(e.message)
rescue => e
elapsed = Time.now - start
ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)")
ne.set_backtrace(e.backtrace)
raise ne
@@ -267,9 +270,10 @@
@api_client.create_log_table(database, table)
rescue TreasureData::NotFoundError
@api_client.create_database(database)
@api_client.create_log_table(database, table)
rescue TreasureData::AlreadyExistsError
+ # ignored
end
end
end
end