lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.2.1 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.2.2
- old
+ new
@@ -13,28 +13,22 @@
config_param :keep_alive, :integer, :default => 15
config_param :ssl, :bool, :default => nil
config_param :ca_file, :string, :default => nil
config_param :key_file, :string, :default => nil
config_param :cert_file, :string, :default => nil
+ config_param :recv_time, :bool, :default => false
+ config_param :recv_time_key, :string, :default => "recv_time"
require 'mqtt'
# Define `router` method of v0.12 to support v0.10 or earlier
unless method_defined?(:router)
define_method("router") { Fluent::Engine }
end
def configure(conf)
super
- @host ||= conf['host']
- @topic ||= conf['topic']
- @bulk_trans ||= conf['bulk_trans']
- @bulk_trans_sep ||= conf['bulk_trans_sep']
- @port ||= conf['port']
- @username ||= conf['username']
- @password ||= conf['password']
- @keep_alive ||= conf['keep_alive']
configure_parser(conf)
init_retry_interval
end
def configure_parser(conf)
@@ -49,13 +43,12 @@
def increment_retry_interval
@retry_interval = @retry_interval * 2
end
def sleep_retry_interval(e, message)
- $log.debug "#{message}"
- $log.debug "#{e.class}: #{e.message}"
- $log.debug "Retry in #{@retry_interval} sec"
+ $log.error "#{message},#{e.class},#{e.message}"
+ $log.error "Retry in #{@retry_interval} sec"
sleep @retry_interval
increment_retry_interval
end
def start
@@ -105,17 +98,26 @@
end
end
end
end
+ def add_recv_time(record)
+ if @recv_time
+ # recv_time is recorded in ms
+ record.merge({@recv_time_key => Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }})
+ else
+ record
+ end
+ end
+
def parse(message)
@parser.parse(message) do |time, record|
if time.nil?
$log.debug "Since time_key field is nil, Fluent::Engine.now is used."
time = Fluent::Engine.now
end
- $log.debug "#{topic}, #{time}, #{record}"
- return [time, record]
+ $log.debug "#{topic}, #{time}, #{add_recv_time(record)}"
+ return [time, add_recv_time(record)]
end
end
def emit(topic, message)
begin