lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.3.5 vs lib/fluent/plugin/out_mqtt.rb in fluent-plugin-mqtt-io-0.3.6

- old
+ new

@@ -4,10 +4,11 @@ require 'fluent/plugin/mqtt_proxy' module Fluent::Plugin class MqttOutput < Output include MqttProxy + include Fluent::TimeMixin::Formatter Fluent::Plugin.register_output('mqtt', self) helpers :compat_parameters, :formatter, :inject @@ -19,10 +20,14 @@ config_section :monitor, required: false, multi: false do desc 'Recording send time for monitoring.' config_param :send_time, :bool, default: false desc 'Recording key name of send time for monitoring.' config_param :send_time_key, :string, default: "send_time" + desc 'Specify time type of send_time (string, unixtime, float).' + config_param :time_type, :string, default: 'string' + desc 'Specify time format of send_time (e.g. %FT%T.%N%:z).' + config_param :time_format, :string, default: nil end # This method is called before starting. # 'conf' is a Hash that includes configuration parameters. # If the configuration is invalid, raise Fluent::ConfigError. @@ -30,10 +35,15 @@ super compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time") formatter_config = conf.elements(name: 'format').first @formatter = formatter_create(conf: formatter_config) @has_buffer_section = conf.elements(name: 'buffer').size > 0 + if !@monitor.nil? + @send_time_formatter = time_formatter_create( + type: @monitor.time_type.to_sym, format: @monitor.time_format + ) + end end def rewrite_tag(tag) if @topic_rewrite_pattern.nil? tag.gsub("\.", "/") @@ -70,32 +80,32 @@ def current_plugin_name :out_mqtt end def add_send_time(record) - if @send_time + if !@monitor.nil? && @monitor.send_time # send_time is recorded in ms - record.merge({"#{@send_time_key}": Fluent::EventTime.now}) + record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)}) else record end end def publish_event_stream(tag, es) if es.class == Fluent::OneEventStream es = inject_values_to_event_stream(tag, es) es.each do |time, record| - log.debug "#{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}" + log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}" rescue_disconnection do @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record))) end end else es = inject_values_to_event_stream(tag, es) array = [] es.each do |time, record| - log.debug "#{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}" + log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}" array << add_send_time(record) end rescue_disconnection do @client.publish(rewrite_tag(tag), @formatter.format(tag, Fluent::EventTime.now, array)) end @@ -118,10 +128,10 @@ def write(chunk) return if chunk.empty? chunk.each do |tag, time, record| rescue_disconnection do - log.debug "#{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}" + log.debug "MqttOutput#write: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}" @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record))) end end end end