lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.1.1 vs lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.1.2
- old
+ new
@@ -6,10 +6,11 @@
def self.included(base)
base.config_param :port, :integer, :default => 1883
base.config_param :bind, :string, :default => '127.0.0.1'
base.config_param :username, :string, :default => nil
base.config_param :password, :string, :default => nil
+ base.config_param :keep_alive, :integer, :default => 15
base.config_param :ssl, :bool, :default => nil
base.config_param :ca_file, :string, :default => nil
base.config_param :key_file, :string, :default => nil
base.config_param :cert_file, :string, :default => nil
base.config_param :time_key, :string, :default => 'time'
@@ -30,28 +31,47 @@
# You can also refer raw parameter via conf[name].
@bind ||= conf['bind']
@port ||= conf['port']
@username ||= conf['username']
@password ||= conf['password']
+ @keep_alive ||= conf['keep_alive']
@time_key ||= conf['time_key']
@time_format ||= conf['time_format']
@topic_rewrite_pattern ||= conf['topic_rewrite_pattern']
@topic_rewrite_replacement ||= conf['topic_rewrite_replacement']
@bulk_trans_sep ||= conf['bulk_trans_sep']
+ init_retry_interval
end
+ def init_retry_interval
+ @retry_interval = 1
+ end
+
+ def increment_retry_interval
+ @retry_interval = @retry_interval * 2
+ end
+
+ def sleep_retry_interval(e, message)
+ $log.debug "#{message}"
+ $log.debug "#{e.class}: #{e.message}"
+ $log.debug "Retry in #{@retry_interval} sec"
+ sleep @retry_interval
+ increment_retry_interval
+ end
+
# This method is called when starting.
# Open sockets or files here.
def start
super
$log.debug "start mqtt #{@bind}"
opts = {
host: @bind,
port: @port,
username: @username,
- password: @password
+ password: @password,
+ keep_alive: @keep_alive
}
opts[:ssl] = @ssl if @ssl
opts[:ca_file] = @ca_file if @ca_file
opts[:cert_file] = @cert_file if @cert_file
opts[:key_file] = @key_file if @key_file
@@ -62,15 +82,22 @@
@connect_thread = Thread.new do
while (true)
begin
@client.disconnect if @client.connected?
@client.connect
+ init_retry_interval
sleep
- rescue MQTT::ProtocolException => pe
- $log.debug "Handling #{pe.class}: #{pe.message}"
+ rescue MQTT::ProtocolException => e
+ sleep_retry_interval(e, "Protocol error occurs.")
next
- rescue Timeout::Error => te
- $log.debug "Handling #{te.class}: #{te.message}"
+ rescue Timeout::Error => e
+ sleep_retry_interval(e, "Timeout error occurs.")
+ next
+ rescue SystemCallError => e
+ sleep_retry_interval(e, "System call error occurs.")
+ next
+ rescue StandardError=> e
+ sleep_retry_interval(e, "The other error occurs.")
next
end
end
end
end