lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.1.1 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.1.2

- old
+ new

@@ -8,10 +8,11 @@ config_param :format, :string, :default => 'json' config_param :bulk_trans, :bool, :default => true config_param :bulk_trans_sep, :string, :default => "\t" config_param :username, :string, :default => nil config_param :password, :string, :default => nil + config_param :keep_alive, :integer, :default => 15 config_param :ssl, :bool, :default => nil config_param :ca_file, :string, :default => nil config_param :key_file, :string, :default => nil config_param :cert_file, :string, :default => nil @@ -29,26 +30,44 @@ @bulk_trans ||= conf['bulk_trans'] @bulk_trans_sep ||= conf['bulk_trans_sep'] @port ||= conf['port'] @username ||= conf['username'] @password ||= conf['password'] + @keep_alive ||= conf['keep_alive'] configure_parser(conf) - @reconn_interval = 2 + init_retry_interval end def configure_parser(conf) @parser = Plugin.new_parser(conf['format']) @parser.configure(conf) end + + def init_retry_interval + @retry_interval = 1 + end + def increment_retry_interval + @retry_interval = @retry_interval * 2 + end + + def sleep_retry_interval(e, message) + $log.debug "#{message}" + $log.debug "#{e.class}: #{e.message}" + $log.debug "Retry in #{@retry_interval} sec" + sleep @retry_interval + increment_retry_interval + end + def start $log.debug "start mqtt #{@bind}" opts = { host: @bind, port: @port, username: @username, - password: @password + password: @password, + keep_alive: @keep_alive } opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca_file if @ca_file opts[:cert_file] = @cert_file if @cert_file opts[:key_file] = @key_file if @key_file @@ -67,25 +86,23 @@ @get_thread = Thread.new do @client.get do |topic, message| emit(topic, message) end end - @reconn_interval = 2 + init_retry_interval sleep - rescue MQTT::ProtocolException => pe - $log.debug "Handling #{pe.class}: #{pe.message}" + rescue MQTT::ProtocolException => e + sleep_retry_interval(e, "Protocol error occurs.") next - rescue Timeout::Error => te - $log.debug "Handling #{te.class}: #{te.message}" + rescue Timeout::Error => e + sleep_retry_interval(e, "Timeout error occurs.") next - rescue Errno::ECONNREFUSED => ce - $log.debug "Server seems to be down... Retry in #{@reconn_interval} sec #{ce.class}: #{ce.message}" - sleep @reconn_interval - @reconn_interval = @reconn_interval * 2 + rescue SystemCallError => e + sleep_retry_interval(e, "System call error occurs.") next - rescue => oe - $log.debug "Other Exception #{oe.class}: #{oe.message}" - exit 1 + rescue StandardError=> e + sleep_retry_interval(e, "The other error occurs.") + next end end end end