lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.23 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.24
- old
+ new
@@ -181,13 +181,16 @@
super(tag, es, chain, key)
end
def format_stream(tag, es)
- out = ''
- off = out.bytesize
+ out = $use_msgpack_5 ? MessagePack::Buffer.new : ''.force_encoding('ASCII-8BIT') # this condition will be removed after removed msgpack v0.4 support
+ off = out.size # size is same as bytesize in ASCII-8BIT string
es.each { |time, record|
+ # Applications may send non-hash record or broken chunk may generate non-hash record so such records should be skipped
+ next unless record.is_a?(Hash)
+
begin
if @anonymizes
@anonymizes.each_pair { |key, scr|
if value = record[key]
record[key] = scr.anonymize(value)
@@ -203,30 +206,30 @@
end
rescue => e
# TODO (a) Remove the transaction mechanism of fluentd
# or (b) keep transaction boundaries in in/out_forward.
# This code disables the transaction mechanism (a).
- log.error "#{e}: #{summarize_record(record)}"
- log.error_backtrace e.backtrace
+ log.warn "Skipped a broken record (#{e}): #{summarize_record(record)}"
+ log.warn_backtrace e.backtrace
next
end
begin
record.to_msgpack(out)
rescue RangeError
TreasureData::API.normalized_msgpack(record, out)
end
- noff = out.bytesize
+ noff = out.size
sz = noff - off
if sz > @record_size_limit
# TODO don't raise error
#raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record
log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
end
off = noff
}
- out
+ out.to_s
end
def summarize_record(record)
json = Yajl.dump(record)
if json.size > 100