lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.1.1 vs lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.1.2

- old
+ new

@@ -6,10 +6,11 @@ def self.included(base) base.config_param :port, :integer, :default => 1883 base.config_param :bind, :string, :default => '127.0.0.1' base.config_param :username, :string, :default => nil base.config_param :password, :string, :default => nil + base.config_param :keep_alive, :integer, :default => 15 base.config_param :ssl, :bool, :default => nil base.config_param :ca_file, :string, :default => nil base.config_param :key_file, :string, :default => nil base.config_param :cert_file, :string, :default => nil base.config_param :time_key, :string, :default => 'time' @@ -30,28 +31,47 @@ # You can also refer raw parameter via conf[name]. @bind ||= conf['bind'] @port ||= conf['port'] @username ||= conf['username'] @password ||= conf['password'] + @keep_alive ||= conf['keep_alive'] @time_key ||= conf['time_key'] @time_format ||= conf['time_format'] @topic_rewrite_pattern ||= conf['topic_rewrite_pattern'] @topic_rewrite_replacement ||= conf['topic_rewrite_replacement'] @bulk_trans_sep ||= conf['bulk_trans_sep'] + init_retry_interval end + def init_retry_interval + @retry_interval = 1 + end + + def increment_retry_interval + @retry_interval = @retry_interval * 2 + end + + def sleep_retry_interval(e, message) + $log.debug "#{message}" + $log.debug "#{e.class}: #{e.message}" + $log.debug "Retry in #{@retry_interval} sec" + sleep @retry_interval + increment_retry_interval + end + # This method is called when starting. # Open sockets or files here. def start super $log.debug "start mqtt #{@bind}" opts = { host: @bind, port: @port, username: @username, - password: @password + password: @password, + keep_alive: @keep_alive } opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca_file if @ca_file opts[:cert_file] = @cert_file if @cert_file opts[:key_file] = @key_file if @key_file @@ -62,15 +82,22 @@ @connect_thread = Thread.new do while (true) begin @client.disconnect if @client.connected? @client.connect + init_retry_interval sleep - rescue MQTT::ProtocolException => pe - $log.debug "Handling #{pe.class}: #{pe.message}" + rescue MQTT::ProtocolException => e + sleep_retry_interval(e, "Protocol error occurs.") next - rescue Timeout::Error => te - $log.debug "Handling #{te.class}: #{te.message}" + rescue Timeout::Error => e + sleep_retry_interval(e, "Timeout error occurs.") + next + rescue SystemCallError => e + sleep_retry_interval(e, "System call error occurs.") + next + rescue StandardError=> e + sleep_retry_interval(e, "The other error occurs.") next end end end end