lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.2.1 vs lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.2.2

- old
+ new

@@ -16,31 +16,21 @@ base.config_param :time_key, :string, :default => 'time' base.config_param :time_format, :string, :default => nil base.config_param :topic_rewrite_pattern, :string, :default => nil base.config_param :topic_rewrite_replacement, :string, :default => nil base.config_param :bulk_trans_sep, :string, :default => "\t" + base.config_param :send_time, :bool, :default => false + base.config_param :send_time_key, :string, :default => "send_time" end require 'mqtt' # This method is called before starting. # 'conf' is a Hash that includes configuration parameters. # If the configuration is invalid, raise Fluent::ConfigError. def configure(conf) super - - # You can also refer raw parameter via conf[name]. - @host ||= conf['host'] - @port ||= conf['port'] - @username ||= conf['username'] - @password ||= conf['password'] - @keep_alive ||= conf['keep_alive'] - @time_key ||= conf['time_key'] - @time_format ||= conf['time_format'] - @topic_rewrite_pattern ||= conf['topic_rewrite_pattern'] - @topic_rewrite_replacement ||= conf['topic_rewrite_replacement'] - @bulk_trans_sep ||= conf['bulk_trans_sep'] init_retry_interval end def init_retry_interval @retry_interval = 1 @@ -49,13 +39,12 @@ def increment_retry_interval @retry_interval = @retry_interval * 2 end def sleep_retry_interval(e, message) - $log.debug "#{message}" - $log.debug "#{e.class}: #{e.message}" - $log.debug "Retry in #{@retry_interval} sec" + $log.error "#{message},#{e.class},#{e.message}" + $log.error "Retry in #{@retry_interval} sec" sleep @retry_interval increment_retry_interval end # This method is called when starting. @@ -130,9 +119,19 @@ {} else {@time_key => format_time(time)} end end + + def add_send_time(record) + if @send_time + # send_time is recorded in ms + record.merge({@send_time_key => Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }}) + else + record + end + end + def rewrite_tag(tag) if @topic_rewrite_pattern.nil? tag.gsub("\.", "/") else