lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.1.1 vs lib/fluent/plugin/in_mqtt.rb in fluent-plugin-mqtt-io-0.1.2
- old
+ new
@@ -8,10 +8,11 @@
config_param :format, :string, :default => 'json'
config_param :bulk_trans, :bool, :default => true
config_param :bulk_trans_sep, :string, :default => "\t"
config_param :username, :string, :default => nil
config_param :password, :string, :default => nil
+ config_param :keep_alive, :integer, :default => 15
config_param :ssl, :bool, :default => nil
config_param :ca_file, :string, :default => nil
config_param :key_file, :string, :default => nil
config_param :cert_file, :string, :default => nil
@@ -29,26 +30,44 @@
@bulk_trans ||= conf['bulk_trans']
@bulk_trans_sep ||= conf['bulk_trans_sep']
@port ||= conf['port']
@username ||= conf['username']
@password ||= conf['password']
+ @keep_alive ||= conf['keep_alive']
configure_parser(conf)
- @reconn_interval = 2
+ init_retry_interval
end
def configure_parser(conf)
@parser = Plugin.new_parser(conf['format'])
@parser.configure(conf)
end
+
+ def init_retry_interval
+ @retry_interval = 1
+ end
+ def increment_retry_interval
+ @retry_interval = @retry_interval * 2
+ end
+
+ def sleep_retry_interval(e, message)
+ $log.debug "#{message}"
+ $log.debug "#{e.class}: #{e.message}"
+ $log.debug "Retry in #{@retry_interval} sec"
+ sleep @retry_interval
+ increment_retry_interval
+ end
+
def start
$log.debug "start mqtt #{@bind}"
opts = {
host: @bind,
port: @port,
username: @username,
- password: @password
+ password: @password,
+ keep_alive: @keep_alive
}
opts[:ssl] = @ssl if @ssl
opts[:ca_file] = @ca_file if @ca_file
opts[:cert_file] = @cert_file if @cert_file
opts[:key_file] = @key_file if @key_file
@@ -67,25 +86,23 @@
@get_thread = Thread.new do
@client.get do |topic, message|
emit(topic, message)
end
end
- @reconn_interval = 2
+ init_retry_interval
sleep
- rescue MQTT::ProtocolException => pe
- $log.debug "Handling #{pe.class}: #{pe.message}"
+ rescue MQTT::ProtocolException => e
+ sleep_retry_interval(e, "Protocol error occurs.")
next
- rescue Timeout::Error => te
- $log.debug "Handling #{te.class}: #{te.message}"
+ rescue Timeout::Error => e
+ sleep_retry_interval(e, "Timeout error occurs.")
next
- rescue Errno::ECONNREFUSED => ce
- $log.debug "Server seems to be down... Retry in #{@reconn_interval} sec #{ce.class}: #{ce.message}"
- sleep @reconn_interval
- @reconn_interval = @reconn_interval * 2
+ rescue SystemCallError => e
+ sleep_retry_interval(e, "System call error occurs.")
next
- rescue => oe
- $log.debug "Other Exception #{oe.class}: #{oe.message}"
- exit 1
+ rescue StandardError=> e
+ sleep_retry_interval(e, "The other error occurs.")
+ next
end
end
end
end