lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.11.0.rc1 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-1.0.0.rc1
- old
+ new
@@ -1,62 +1,57 @@
+require 'fileutils'
+require 'tempfile'
+require 'zlib'
+require 'stringio'
require 'td-client'
-require 'fluent/output'
+
+require 'fluent/plugin/output'
require 'fluent/plugin/td_plugin_version'
-module Fluent
- class TreasureDataLogOutput < BufferedOutput
- Plugin.register_output('tdlog', self)
+module Fluent::Plugin
+ class TreasureDataLogOutput < Output
+ Fluent::Plugin.register_output('tdlog', self)
IMPORT_SIZE_LIMIT = 32 * 1024 * 1024
+ UPLOAD_EXT = 'msgpack.gz'.freeze
- # To support log_level option since Fluentd v0.10.43
- unless method_defined?(:log)
- define_method(:log) { $log }
- end
+ helpers :event_emitter, :compat_parameters
config_param :apikey, :string, :secret => true
config_param :auto_create_table, :bool, :default => true
+ config_param :database, :string, :default => nil
+ config_param :table, :string, :default => nil
config_param :use_gzip_command, :bool, :default => false
config_param :endpoint, :string, :default => TreasureData::API::NEW_DEFAULT_ENDPOINT
config_param :use_ssl, :bool, :default => true
+ config_param :tmpdir, :string, :default => nil
+ config_param :http_proxy, :string, :default => nil
config_param :connect_timeout, :integer, :default => nil
config_param :read_timeout, :integer, :default => nil
config_param :send_timeout, :integer, :default => nil
- config_set_default :flush_interval, 300
+ config_section :buffer do
+ config_set_default :@type, 'file'
+ config_set_default :chunk_keys, ['tag']
+ config_set_default :flush_interval, 300
+ config_set_default :chunk_limit_size, IMPORT_SIZE_LIMIT
+ end
+
def initialize
- require 'fileutils'
- require 'tempfile'
- require 'zlib'
- require 'net/http'
- require 'json'
- require 'cgi' # CGI.escape
- require 'time' # Time#rfc2822
- require 'digest/md5'
- require 'stringio'
super
- @tmpdir = nil
@key = nil
@key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys.
@record_size_limit = 32 * 1024 * 1024 # TODO
@table_list = {}
@empty_gz_data = TreasureData::API.create_empty_gz_data
@user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze
end
def configure(conf)
- # overwrite default value of buffer_chunk_limit
- unless conf.has_key?('buffer_chunk_limit')
- conf['buffer_chunk_limit'] = IMPORT_SIZE_LIMIT
- end
+ compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')
- # v0.14 seems to have a bug of config_set_default: https://github.com/treasure-data/fluent-plugin-td/pull/22#issuecomment-230782005
- unless conf.has_key?('buffer_type')
- conf['buffer_type'] = 'file'
- end
-
super
if @use_gzip_command
require 'open3'
@@ -65,23 +60,20 @@
rescue Errno::ENOENT
raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter"
end
end
- if conf.has_key?('tmpdir')
- @tmpdir = conf['tmpdir']
- FileUtils.mkdir_p(@tmpdir)
- end
+ FileUtils.mkdir_p(@tmpdir) if @tmpdir
- database = conf['database']
- table = conf['table']
- if database && table
- validate_database_and_table_name(database, table, conf)
- @key = "#{database}.#{table}"
+ if @database && @table
+ validate_database_and_table_name(@database, @table)
+ @key = "#{@database}.#{@table}"
+ else
+ unless @chunk_key_tag
+ raise Fluent::ConfigError, "'tag' must be included in <buffer ARG> when database and table are not specified"
+ end
end
-
- @http_proxy = conf['http_proxy']
end
def start
super
@@ -91,76 +83,55 @@
}
@client = TreasureData::Client.new(@apikey, client_opts)
if @key
if @auto_create_table
- database, table = @key.split('.',2)
- ensure_database_and_table(database, table)
+ ensure_database_and_table(@database, @table)
else
check_table_exists(@key)
end
end
end
- def emit(tag, es, chain)
- if @key
- key = @key
- else
- database, table = tag.split('.')[-2,2]
- database = TreasureData::API.normalize_database_name(database)
- table = TreasureData::API.normalize_table_name(table)
- key = "#{database}.#{table}"
- end
+ def multi_workers_ready?
+ true
+ end
- unless @auto_create_table
- check_table_exists(key)
- end
-
- super(tag, es, chain, key)
+ def formatted_to_msgpack_binary
+ true
end
- def format_stream(tag, es)
- out = MessagePack::Buffer.new
- off = out.size # size is same as bytesize in ASCII-8BIT string
- es.each { |time, record|
- # Applications may send non-hash record or broken chunk may generate non-hash record so such records should be skipped
- next unless record.is_a?(Hash)
+ def format(tag, time, record)
+ begin
+ record['time'] = time.to_i
+ record.delete(:time) if record.has_key?(:time)
- begin
- record['time'] = time.to_i
- record.delete(:time) if record.has_key?(:time)
-
- if record.size > @key_num_limit
- raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record
- end
- rescue => e
- # TODO (a) Remove the transaction mechanism of fluentd
- # or (b) keep transaction boundaries in in/out_forward.
- # This code disables the transaction mechanism (a).
- log.warn "Skipped a broken record (#{e}): #{summarize_record(record)}"
- log.warn_backtrace e.backtrace
- next
+ if record.size > @key_num_limit
+ # TODO include summary of the record
+ router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)"))
+ return nil
end
+ rescue => e
+ router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}"))
+ return nil
+ end
- begin
- record.to_msgpack(out)
- rescue RangeError
- # In msgpack v0.5, 'out' becomes String, not Buffer. This is not a problem because Buffer has a compatibility with String
- out = out.to_s[0, off]
- TreasureData::API.normalized_msgpack(record, out)
- end
+ begin
+ result = record.to_msgpack
+ rescue RangeError
+ result = TreasureData::API.normalized_msgpack(record)
+ rescue => e
+ router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("can't convert record to msgpack: #{e}"))
+ return nil
+ end
- noff = out.size
- sz = noff - off
- if sz > @record_size_limit
- # TODO don't raise error
- #raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record
- log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
- end
- off = noff
- }
- out.to_s
+ if result.bytesize > @record_size_limit
+ # Don't raise error. Large size is not critical for streaming import
+ log.warn "Size of a record too large (#{result.bytesize} bytes): #{summarize_record(record)}"
+ end
+
+ result
end
def summarize_record(record)
json = Yajl.dump(record)
if json.size > 100
@@ -170,14 +141,21 @@
end
end
def write(chunk)
unique_id = chunk.unique_id
- database, table = chunk.key.split('.', 2)
+ if @key
+ database, table = @database, @table
+ else
+ database, table = chunk.metadata.tag.split('.')[-2, 2]
+ database = TreasureData::API.normalize_database_name(database)
+ table = TreasureData::API.normalize_table_name(table)
+ end
+
FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
- f = Tempfile.new("tdlog-#{chunk.key}-", @tmpdir)
+ f = Tempfile.new("tdlog-#{chunk.metadata.tag}-", @tmpdir)
f.binmode
size = if @use_gzip_command
gzip_by_command(chunk, f)
else
@@ -189,15 +167,15 @@
f.close(true) if f
end
# TODO: Share this routine with s3 compressors
def gzip_by_command(chunk, tmp)
- chunk_is_file = @buffer_type == 'file'
+ chunk_is_file = @buffer_config['@type'] == 'file'
path = if chunk_is_file
chunk.path
else
- w = Tempfile.new("gzip-tdlog-#{chunk.key}-", @tmpdir)
+ w = Tempfile.new("gzip-tdlog-#{chunk.metadata.tag}-", @tmpdir)
w.binmode
chunk.write_to(w)
w.close
w.path
end
@@ -233,11 +211,11 @@
log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
begin
begin
start = Time.now
- @client.import(database, table, "msgpack.gz", io, size, unique_str)
+ @client.import(database, table, UPLOAD_EXT, io, size, unique_str)
rescue TreasureData::NotFoundError => e
unless @auto_create_table
raise e
end
ensure_database_and_table(database, table)
@@ -256,30 +234,30 @@
unless @table_list.has_key?(key)
database, table = key.split('.', 2)
log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data"
io = StringIO.new(@empty_gz_data)
begin
- @client.import(database, table, "msgpack.gz", io, io.size)
+ @client.import(database, table, UPLOAD_EXT, io, io.size)
@table_list[key] = true
rescue TreasureData::NotFoundError
raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it."
rescue => e
log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect
log.debug_backtrace e.backtrace
end
end
end
- def validate_database_and_table_name(database, table, conf)
+ def validate_database_and_table_name(database, table)
begin
TreasureData::API.validate_database_name(database)
rescue => e
- raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}"
+ raise ConfigError, "Invalid database name #{database.inspect}: #{e}"
end
begin
TreasureData::API.validate_table_name(table)
rescue => e
- raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}"
+ raise ConfigError, "Invalid table name #{table.inspect}: #{e}"
end
end
def ensure_database_and_table(database, table)
log.info "Creating table #{database}.#{table} on TreasureData"