lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.3.5 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.3.6
- old
+ new
@@ -4,10 +4,11 @@
require 'fluent/plugin/mqtt_proxy'
module Fluent::Plugin
class MqttOutput < Output
include MqttProxy
+ include Fluent::TimeMixin::Formatter
Fluent::Plugin.register_output('mqtt', self)
helpers :compat_parameters, :formatter, :inject
@@ -19,10 +20,14 @@
config_section :monitor, required: false, multi: false do
desc 'Recording send time for monitoring.'
config_param :send_time, :bool, default: false
desc 'Recording key name of send time for monitoring.'
config_param :send_time_key, :string, default: "send_time"
+ desc 'Specify time type of send_time (string, unixtime, float).'
+ config_param :time_type, :string, default: 'string'
+ desc 'Specify time format of send_time (e.g. %FT%T.%N%:z).'
+ config_param :time_format, :string, default: nil
end
# This method is called before starting.
# 'conf' is a Hash that includes configuration parameters.
# If the configuration is invalid, raise Fluent::ConfigError.
@@ -30,10 +35,15 @@
super
compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time")
formatter_config = conf.elements(name: 'format').first
@formatter = formatter_create(conf: formatter_config)
@has_buffer_section = conf.elements(name: 'buffer').size > 0
+ if !@monitor.nil?
+ @send_time_formatter = time_formatter_create(
+ type: @monitor.time_type.to_sym, format: @monitor.time_format
+ )
+ end
end
def rewrite_tag(tag)
if @topic_rewrite_pattern.nil?
tag.gsub("\.", "/")
@@ -70,32 +80,32 @@
def current_plugin_name
:out_mqtt
end
def add_send_time(record)
- if @send_time
+ if !@monitor.nil? && @monitor.send_time
# send_time is recorded in ms
- record.merge({"#{@send_time_key}": Fluent::EventTime.now})
+ record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)})
else
record
end
end
def publish_event_stream(tag, es)
if es.class == Fluent::OneEventStream
es = inject_values_to_event_stream(tag, es)
es.each do |time, record|
- log.debug "#{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
+ log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
rescue_disconnection do
@client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
end
end
else
es = inject_values_to_event_stream(tag, es)
array = []
es.each do |time, record|
- log.debug "#{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
+ log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
array << add_send_time(record)
end
rescue_disconnection do
@client.publish(rewrite_tag(tag), @formatter.format(tag, Fluent::EventTime.now, array))
end
@@ -118,10 +128,10 @@
def write(chunk)
return if chunk.empty?
chunk.each do |tag, time, record|
rescue_disconnection do
- log.debug "#{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}"
+ log.debug "MqttOutput#write: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}"
@client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
end
end
end
end