lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.6 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.7

- old
+ new

@@ -91,26 +91,27 @@ @parser.parse(message) do |time, record| if time.nil? log.debug "Since time_key field is nil, Fluent::EventTime.now is used." time = Fluent::EventTime.now end - return [time, add_recv_time(record)] + return [time, record] end end def emit(topic, message) begin tag = topic.gsub("/","\.") time, record = parse(message) - log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}" if record.is_a?(Array) mes = Fluent::MultiEventStream.new record.each do |single_record| - mes.add(@parser.parse_time(single_record), single_record) + log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(single_record)}" + mes.add(@parser.parse_time(single_record), add_recv_time(single_record)) end router.emit_stream(tag, mes) else - router.emit(tag, time, record) + log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}" + router.emit(tag, time, add_recv_time(record)) end rescue Exception => e log.error error: e.to_s log.debug_backtrace(e.backtrace) end