lib/fluent/plugin/out_mqtt_buf.rb in fluent-plugin-mqtt-io-0.2.1 vs lib/fluent/plugin/out_mqtt_buf.rb in fluent-plugin-mqtt-io-0.2.2

- old
+ new

@@ -9,11 +9,10 @@ # This method is called when an event reaches to Fluentd. # Convert the event to a raw string. def format(tag, time, record) [tag, time, record].to_msgpack - #[tag, time, record].to_json + "\n" end # This method is called every flush interval. Write the buffer chunk # to files or databases here. # 'chunk' is a buffer chunk that includes multiple formatted @@ -22,21 +21,16 @@ # # NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins. def write(chunk) messages = {} chunk.msgpack_each do |tag, time, record| - #$log.debug "Thread ID: #{Thread.current.object_id}, tag: #{tag}, time: #{format_time(time)}, record: #{record}" messages[tag] = [] if messages[tag].nil? - messages[tag] << record.merge(timestamp_hash(time)) + messages[tag] << add_send_time(record).merge(timestamp_hash(time)) end messages.keys.each do |tag| $log.debug "Thread ID: #{Thread.current.object_id}, topic: #{rewrite_tag(tag)}, message: #{messages[tag]}" @client.publish(rewrite_tag(tag), messages[tag].map {|m| m.to_json}.join(@bulk_trans_sep)) end $log.flush - #json = json_parse(chunk.open {|io| io.readline}) - #$log.debug "#{json[0]}, #{format_time(json[1])}, #{json[2]}" - #@client.publish(rewrite_tag(json[0]), (json[2].merge(timestamp_hash(json[1]))).to_json) - #$log.flush end end end