lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.23 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.24

- old
+ new

@@ -181,13 +181,16 @@ super(tag, es, chain, key) end def format_stream(tag, es) - out = '' - off = out.bytesize + out = $use_msgpack_5 ? MessagePack::Buffer.new : ''.force_encoding('ASCII-8BIT') # this condition will be removed after removed msgpack v0.4 support + off = out.size # size is same as bytesize in ASCII-8BIT string es.each { |time, record| + # Applications may send non-hash record or broken chunk may generate non-hash record so such records should be skipped + next unless record.is_a?(Hash) + begin if @anonymizes @anonymizes.each_pair { |key, scr| if value = record[key] record[key] = scr.anonymize(value) @@ -203,30 +206,30 @@ end rescue => e # TODO (a) Remove the transaction mechanism of fluentd # or (b) keep transaction boundaries in in/out_forward. # This code disables the transaction mechanism (a). - log.error "#{e}: #{summarize_record(record)}" - log.error_backtrace e.backtrace + log.warn "Skipped a broken record (#{e}): #{summarize_record(record)}" + log.warn_backtrace e.backtrace next end begin record.to_msgpack(out) rescue RangeError TreasureData::API.normalized_msgpack(record, out) end - noff = out.bytesize + noff = out.size sz = noff - off if sz > @record_size_limit # TODO don't raise error #raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}" end off = noff } - out + out.to_s end def summarize_record(record) json = Yajl.dump(record) if json.size > 100