lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.6 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.7
- old
+ new
@@ -91,26 +91,27 @@
@parser.parse(message) do |time, record|
if time.nil?
log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
time = Fluent::EventTime.now
end
- return [time, add_recv_time(record)]
+ return [time, record]
end
end
def emit(topic, message)
begin
tag = topic.gsub("/","\.")
time, record = parse(message)
- log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}"
if record.is_a?(Array)
mes = Fluent::MultiEventStream.new
record.each do |single_record|
- mes.add(@parser.parse_time(single_record), single_record)
+ log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(single_record)}"
+ mes.add(@parser.parse_time(single_record), add_recv_time(single_record))
end
router.emit_stream(tag, mes)
else
- router.emit(tag, time, record)
+ log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}"
+ router.emit(tag, time, add_recv_time(record))
end
rescue Exception => e
log.error error: e.to_s
log.debug_backtrace(e.backtrace)
end