lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.2.1 vs lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.2.2
- old
+ new
@@ -16,31 +16,21 @@
base.config_param :time_key, :string, :default => 'time'
base.config_param :time_format, :string, :default => nil
base.config_param :topic_rewrite_pattern, :string, :default => nil
base.config_param :topic_rewrite_replacement, :string, :default => nil
base.config_param :bulk_trans_sep, :string, :default => "\t"
+ base.config_param :send_time, :bool, :default => false
+ base.config_param :send_time_key, :string, :default => "send_time"
end
require 'mqtt'
# This method is called before starting.
# 'conf' is a Hash that includes configuration parameters.
# If the configuration is invalid, raise Fluent::ConfigError.
def configure(conf)
super
-
- # You can also refer raw parameter via conf[name].
- @host ||= conf['host']
- @port ||= conf['port']
- @username ||= conf['username']
- @password ||= conf['password']
- @keep_alive ||= conf['keep_alive']
- @time_key ||= conf['time_key']
- @time_format ||= conf['time_format']
- @topic_rewrite_pattern ||= conf['topic_rewrite_pattern']
- @topic_rewrite_replacement ||= conf['topic_rewrite_replacement']
- @bulk_trans_sep ||= conf['bulk_trans_sep']
init_retry_interval
end
def init_retry_interval
@retry_interval = 1
@@ -49,13 +39,12 @@
def increment_retry_interval
@retry_interval = @retry_interval * 2
end
def sleep_retry_interval(e, message)
- $log.debug "#{message}"
- $log.debug "#{e.class}: #{e.message}"
- $log.debug "Retry in #{@retry_interval} sec"
+ $log.error "#{message},#{e.class},#{e.message}"
+ $log.error "Retry in #{@retry_interval} sec"
sleep @retry_interval
increment_retry_interval
end
# This method is called when starting.
@@ -130,9 +119,19 @@
{}
else
{@time_key => format_time(time)}
end
end
+
+ def add_send_time(record)
+ if @send_time
+ # send_time is recorded in ms
+ record.merge({@send_time_key => Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }})
+ else
+ record
+ end
+ end
+
def rewrite_tag(tag)
if @topic_rewrite_pattern.nil?
tag.gsub("\.", "/")
else