lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.4.4 vs lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.5.0
- old
+ new
@@ -18,12 +18,14 @@
base.config_param :keep_alive, :integer, default: 15
base.desc 'Specify initial connection retry interval.'
base.config_param :initial_interval, :integer, default: 1
base.desc 'Specify increasing ratio of connection retry interval.'
base.config_param :retry_inc_ratio, :integer, default: 2
- base.desc 'Specify maximum connection retry interval.'
+ base.desc 'Specify the maximum connection retry interval.'
base.config_param :max_retry_interval, :integer, default: 300
+ base.desc 'Specify threshold of retry frequency as number of retries per minutes. Frequency is monitored per retry.'
+ base.config_param :max_retry_freq, :integer, default: 10
base.config_section :security, required: false, multi: false do
### User based authentication
desc 'The username for authentication'
config_param :username, :string, default: nil
@@ -42,15 +44,24 @@
end
end
class MqttError < StandardError; end
+ class ExceedRetryFrequencyThresholdException < StandardError; end
+
def current_plugin_name
# should be implemented
end
def start_proxy
+ # Start a thread from main thread for handling a thread generated
+ # by MQTT::Client#get (in_mqtt). Dummy thread is used for out_mqtt
+ # to keep the same implementation style.
+ @proxy_thread = thread_create("#{current_plugin_name}_proxy".to_sym, &method(:proxy))
+ end
+
+ def proxy
log.debug "start mqtt proxy for #{current_plugin_name}"
log.debug "start to connect mqtt broker #{@host}:#{@port}"
opts = {
host: @host,
port: @port,
@@ -66,99 +77,101 @@
opts[:cert_file] = @security.tls.cert_file
opts[:key_file] = @security.tls.key_file
end
init_retry_interval
+ @retry_sequence = []
@client = MQTT::Client.new(opts)
connect
end
def shutdown_proxy
- @client.disconnect
+ disconnect
end
def init_retry_interval
@retry_interval = @initial_interval
end
def increment_retry_interval
- return @retry_interval if @retry_interval >= @max_retry_interval
+ return @max_retry_interval if @retry_interval >= @max_retry_interval
@retry_interval = @retry_interval * @retry_inc_ratio
end
- def retry_connect(e, message)
- if !@_retrying
- log.error "#{message},#{e.class},#{e.message}"
- log.error "Retry in #{@retry_interval} sec"
- timer_execute("#{current_plugin_name}_connect".to_sym, @retry_interval, repeat: false, &method(:connect))
- @_retrying = true
- increment_retry_interval
- after_disconnection
- @client.disconnect if @client.connected?
+ def update_retry_sequence(e)
+ @retry_sequence << {time: Time.now, error: "#{e.class}: #{e.message}"}
+ # delete old retry records
+ while @retry_sequence[0][:time] < Time.now - 60
+ @retry_sequence.shift
+ end
+ end
+
+ def check_retry_frequency
+ return if @retry_sequence.size <= 1
+ if @retry_sequence.size > @max_retry_freq
+ log.error "Retry frequency threshold is exceeded: #{@retry_sequence}"
+ raise ExceedRetryFrequencyThresholdException
end
end
- def kill_connect_thread
- @connect_thread.kill if !@connect_thread.nil?
+ def retry_connect(e, message)
+ log.error "#{message},#{e.class},#{e.message}"
+ log.error "Retry in #{@retry_interval} sec"
+ update_retry_sequence(e)
+ check_retry_frequency
+ disconnect
+ sleep @retry_interval
+ increment_retry_interval
+ connect
+ # never reach here
end
- def after_disconnection
+ def disconnect
# should be implemented
- kill_connect_thread
end
+ def terminate
+ end
+
def rescue_disconnection
# Errors cannot be caught by fluentd core must be caught here.
# Since fluentd core retries write method for buffered output
# when it caught Errors during the previous write,
# caughtable Error, e.g. MqttError, should be raised here.
begin
yield
rescue MQTT::ProtocolException => e
- # TODO:
- # Thread created via fluentd thread API, e.g. thread_create,
- # cannot catch MQTT::ProtocolException raised from @read_thread
- # in ruby-mqtt. So, the current version uses plugin local thread
- # @connect_thread to catch it.
retry_connect(e, "Protocol error occurs in #{current_plugin_name}.")
- raise MqttError, "Protocol error occurs."
rescue Timeout::Error => e
retry_connect(e, "Timeout error occurs in #{current_plugin_name}.")
- raise Timeout::Error, "Timeout error occurs."
rescue SystemCallError => e
retry_connect(e, "System call error occurs in #{current_plugin_name}.")
- raise SystemCallError, "System call error occurs."
rescue StandardError=> e
retry_connect(e, "The other error occurs in #{current_plugin_name}.")
- raise StandardError, "The other error occurs."
rescue MQTT::NotConnectedException=> e
# Since MQTT::NotConnectedException is raised only on publish,
# connection error should be catched before this error.
# So, reconnection process is omitted for this Exception
# to prevent waistful increment of retry interval.
#log.error "MQTT not connected exception occurs.,#{e.class},#{e.message}"
#retry_connect(e, "MQTT not connected exception occurs.")
- raise MqttError, "MQTT not connected exception occurs in #{current_plugin_name}."
+ #raise MqttError, "MQTT not connected exception occurs in #{current_plugin_name}."
end
end
def after_connection
# should be implemented
# returns thread instance for monitor thread to wait
# for Exception raised by MQTT I/O
end
def connect
- #@connect_thread = thread_create("#{current_plugin_name}_monitor".to_sym) do
- @_retrying = false
- @connect_thread = Thread.new do
- rescue_disconnection do
- @client.connect
- log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}"
- init_retry_interval
- thread = after_connection
- thread.join
- end
+ rescue_disconnection do
+ @client.connect
+ log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}"
+ init_retry_interval
+ thread = after_connection
+ thread.join
end
end
end
end