lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.2.1 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.2.2

- old
+ new

@@ -13,28 +13,22 @@ config_param :keep_alive, :integer, :default => 15 config_param :ssl, :bool, :default => nil config_param :ca_file, :string, :default => nil config_param :key_file, :string, :default => nil config_param :cert_file, :string, :default => nil + config_param :recv_time, :bool, :default => false + config_param :recv_time_key, :string, :default => "recv_time" require 'mqtt' # Define `router` method of v0.12 to support v0.10 or earlier unless method_defined?(:router) define_method("router") { Fluent::Engine } end def configure(conf) super - @host ||= conf['host'] - @topic ||= conf['topic'] - @bulk_trans ||= conf['bulk_trans'] - @bulk_trans_sep ||= conf['bulk_trans_sep'] - @port ||= conf['port'] - @username ||= conf['username'] - @password ||= conf['password'] - @keep_alive ||= conf['keep_alive'] configure_parser(conf) init_retry_interval end def configure_parser(conf) @@ -49,13 +43,12 @@ def increment_retry_interval @retry_interval = @retry_interval * 2 end def sleep_retry_interval(e, message) - $log.debug "#{message}" - $log.debug "#{e.class}: #{e.message}" - $log.debug "Retry in #{@retry_interval} sec" + $log.error "#{message},#{e.class},#{e.message}" + $log.error "Retry in #{@retry_interval} sec" sleep @retry_interval increment_retry_interval end def start @@ -105,17 +98,26 @@ end end end end + def add_recv_time(record) + if @recv_time + # recv_time is recorded in ms + record.merge({@recv_time_key => Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }}) + else + record + end + end + def parse(message) @parser.parse(message) do |time, record| if time.nil? $log.debug "Since time_key field is nil, Fluent::Engine.now is used." time = Fluent::Engine.now end - $log.debug "#{topic}, #{time}, #{record}" - return [time, record] + $log.debug "#{topic}, #{time}, #{add_recv_time(record)}" + return [time, add_recv_time(record)] end end def emit(topic, message) begin