lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.2.3 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.3.0

- old
+ new

@@ -1,45 +1,75 @@ -module Fluent +require 'fluent/plugin/input' +require 'fluent/event' +require 'fluent/time' +require 'mqtt' + +module Fluent::Plugin class MqttInput < Input - Plugin.register_input('mqtt', self) + Fluent::Plugin.register_input('mqtt', self) - config_param :port, :integer, :default => 1883 + helpers :thread, :compat_parameters, :parser + + MQTT_PORT = 1883 + + desc 'The address to connect to.' config_param :host, :string, :default => '127.0.0.1' + desc 'The port to connect to.' + config_param :port, :integer, :default => MQTT_PORT + desc 'The topic to subscribe.' config_param :topic, :string, :default => '#' + desc 'The format to receive.' config_param :format, :string, :default => 'json' - config_param :bulk_trans, :bool, :default => true - config_param :bulk_trans_sep, :string, :default => "\t" - config_param :username, :string, :default => nil - config_param :password, :string, :default => nil + desc 'Specify keep alive interval.' config_param :keep_alive, :integer, :default => 15 - config_param :ssl, :bool, :default => nil - config_param :ca_file, :string, :default => nil - config_param :key_file, :string, :default => nil - config_param :cert_file, :string, :default => nil - config_param :recv_time, :bool, :default => false - config_param :recv_time_key, :string, :default => "recv_time" + desc 'Specify initial interval for reconnection.' config_param :initial_interval, :integer, :default => 1 + desc 'Specify increasing ratio of reconnection interval.' config_param :retry_inc_ratio, :integer, :default => 2 - require 'mqtt' + # bulk_trans is deprecated + # multiple entries must be inputted as an Array + #config_param :bulk_trans, :bool, :default => true + #config_param :bulk_trans_sep, :string, :default => "\t" - # Define `router` method of v0.12 to support v0.10 or earlier - unless method_defined?(:router) - define_method("router") { Fluent::Engine } + config_section :security, required: false, multi: false do + ### User based authentication + desc 'The username for authentication' + config_param :username, :string, :default => nil + desc 'The password for authentication' + config_param :password, :string, :default => nil + desc 'Use TLS or not.' + config_param :use_tls, :bool, :default => nil + config_section :tls, required: false, multi: true do + desc 'Specify TLS ca file.' + config_param :ca_file, :string, :default => nil + desc 'Specify TLS key file.' + config_param :key_file, :string, :default => nil + desc 'Specify TLS cert file.' + config_param :cert_file, :string, :default => nil + end end + config_section :monitor, required: false, multi: false do + desc 'Record received time into message or not.' + config_param :recv_time, :bool, :default => false + desc 'Specify the attribute name of received time.' + config_param :recv_time_key, :string, :default => "recv_time" + end + def configure(conf) super configure_parser(conf) init_retry_interval end def configure_parser(conf) - @parser = Plugin.new_parser(conf['format']) - @parser.configure(conf) + compat_parameters_convert(conf, :parser) + parser_config = conf.elements('parse').first + @parser = parser_create(conf: parser_config) end - + def init_retry_interval @retry_interval = @initial_interval end def increment_retry_interval @@ -56,82 +86,90 @@ def start $log.debug "start mqtt #{@host}" opts = { host: @host, port: @port, - username: @username, - password: @password, keep_alive: @keep_alive } - opts[:ssl] = @ssl if @ssl - opts[:ca_file] = @ca_file if @ca_file - opts[:cert_file] = @cert_file if @cert_file - opts[:key_file] = @key_file if @key_file + opts[:username] = @security.username if @security.respond_to?(:username) + opts[:password] = @security.password if @security.respond_to?(:password) + if @security.respond_to?(:use_tls) && @security.use_tls + opts[:ssl] = @security.use_tls + opts[:ca_file] = @security.tls.ca_file + opts[:cert_file] = @security.tls.cert_file + opts[:key_file] = @security.tls.key_file + end # In order to handle Exception raised from reading Thread # in MQTT::Client caused by network disconnection (during read_byte), # @connect_thread generates connection. @client = MQTT::Client.new(opts) - @connect_thread = Thread.new do - while (true) - begin - @client.disconnect if @client.connected? - @client.connect - @client.subscribe(@topic) - @get_thread.kill if !@get_thread.nil? && @get_thread.alive? - @get_thread = Thread.new do - @client.get do |topic, message| - emit(topic, message) - end + @get_thread = nil + @connect_thread = Thread.new(&method(:connect_loop)) + end + + def connect_loop + while (true) + begin + @get_thread.kill if !@get_thread.nil? && @get_thread.alive? + @client.disconnect if @client.connected? + @client.connect + @client.subscribe(@topic) + @get_thread = Thread.new do + @client.get do |topic, message| + emit(topic, message) end - init_retry_interval - sleep - rescue MQTT::ProtocolException => e - sleep_retry_interval(e, "Protocol error occurs.") - next - rescue Timeout::Error => e - sleep_retry_interval(e, "Timeout error occurs.") - next - rescue SystemCallError => e - sleep_retry_interval(e, "System call error occurs.") - next - rescue StandardError=> e - sleep_retry_interval(e, "The other error occurs.") - next end + init_retry_interval + sleep + rescue MQTT::ProtocolException => e + sleep_retry_interval(e, "Protocol error occurs.") + next + rescue Timeout::Error => e + sleep_retry_interval(e, "Timeout error occurs.") + next + rescue SystemCallError => e + sleep_retry_interval(e, "System call error occurs.") + next + rescue StandardError=> e + sleep_retry_interval(e, "The other error occurs.") + next end end end def add_recv_time(record) if @recv_time # recv_time is recorded in ms - record.merge({@recv_time_key => Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }}) + record.merge({@recv_time_key => Fluent::EventTime.now}) else record end end def parse(message) @parser.parse(message) do |time, record| if time.nil? - $log.debug "Since time_key field is nil, Fluent::Engine.now is used." - time = Fluent::Engine.now + $log.debug "Since time_key field is nil, Fluent::EventTime.now is used." + time = Fluent::EventTime.now end $log.debug "#{topic}, #{time}, #{add_recv_time(record)}" return [time, add_recv_time(record)] end end def emit(topic, message) begin - topic.gsub!("/","\.") - if @bulk_trans - message.split(@bulk_trans_sep).each do |m| - router.emit(topic, *parse(m)) + tag = topic.gsub("/","\.") + time, record = parse(message) + if record.is_a?(Array) + mes = Fluent::MultiEventStream.new + record.each do |single_record| + mes.add(@parser.parse_time(single_record), single_record) end + router.emit_stream(tag, mes) else - router.emit(topic, *parse(message)) + router.emit(tag, time, record) end rescue Exception => e $log.error :error => e.to_s $log.debug_backtrace(e.backtrace) end