lib/fluent/plugin/out_mqtt_buf.rb in fluent-plugin-mqtt-io-0.0.6 vs lib/fluent/plugin/out_mqtt_buf.rb in fluent-plugin-mqtt-io-0.1.0
- old
+ new
@@ -1,30 +1,42 @@
module Fluent
- class MqttOutput < BufferedOutput
+ class MqttBufferedOutput < BufferedOutput
require 'fluent/plugin/mqtt_output_mixin'
include Fluent::MqttOutputMixin
# First, register the plugin. NAME is the name of this plugin
# and identifies the plugin in the configuration file.
Fluent::Plugin.register_output('mqtt_buf', self)
# This method is called when an event reaches to Fluentd.
# Convert the event to a raw string.
def format(tag, time, record)
- [tag, time, record].to_json + "\n"
+ [tag, time, record].to_msgpack
+ #[tag, time, record].to_json + "\n"
end
# This method is called every flush interval. Write the buffer chunk
# to files or databases here.
# 'chunk' is a buffer chunk that includes multiple formatted
# events. You can use 'data = chunk.read' to get all events and
# 'chunk.open {|io| ... }' to get IO objects.
#
# NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.
def write(chunk)
- json = json_parse(chunk.read)
- $log.debug "#{json[0]}, #{format_time(json[1])}, #{json[2]}"
- @connect.publish(rewrite_tag(json[0]), (json[2].merge(timestamp_hash(json[1]))).to_json)
+ messages = {}
+ chunk.msgpack_each do |tag, time, record|
+ #$log.debug "Thread ID: #{Thread.current.object_id}, tag: #{tag}, time: #{format_time(time)}, record: #{record}"
+ messages[tag] = [] if messages[tag].nil?
+ messages[tag] << record.merge(timestamp_hash(time))
+ end
+ messages.keys.each do |tag|
+ $log.debug "Thread ID: #{Thread.current.object_id}, topic: #{rewrite_tag(tag)}, message: #{messages[tag]}"
+ @client.publish(rewrite_tag(tag), messages[tag].map {|m| m.to_json}.join(@bulk_trans_sep))
+ end
$log.flush
+ #json = json_parse(chunk.open {|io| io.readline})
+ #$log.debug "#{json[0]}, #{format_time(json[1])}, #{json[2]}"
+ #@client.publish(rewrite_tag(json[0]), (json[2].merge(timestamp_hash(json[1]))).to_json)
+ #$log.flush
end
end
end