lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.4.4 vs lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.5.0

- old
+ new

@@ -18,12 +18,14 @@ base.config_param :keep_alive, :integer, default: 15 base.desc 'Specify initial connection retry interval.' base.config_param :initial_interval, :integer, default: 1 base.desc 'Specify increasing ratio of connection retry interval.' base.config_param :retry_inc_ratio, :integer, default: 2 - base.desc 'Specify maximum connection retry interval.' + base.desc 'Specify the maximum connection retry interval.' base.config_param :max_retry_interval, :integer, default: 300 + base.desc 'Specify threshold of retry frequency as number of retries per minutes. Frequency is monitored per retry.' + base.config_param :max_retry_freq, :integer, default: 10 base.config_section :security, required: false, multi: false do ### User based authentication desc 'The username for authentication' config_param :username, :string, default: nil @@ -42,15 +44,24 @@ end end class MqttError < StandardError; end + class ExceedRetryFrequencyThresholdException < StandardError; end + def current_plugin_name # should be implemented end def start_proxy + # Start a thread from main thread for handling a thread generated + # by MQTT::Client#get (in_mqtt). Dummy thread is used for out_mqtt + # to keep the same implementation style. + @proxy_thread = thread_create("#{current_plugin_name}_proxy".to_sym, &method(:proxy)) + end + + def proxy log.debug "start mqtt proxy for #{current_plugin_name}" log.debug "start to connect mqtt broker #{@host}:#{@port}" opts = { host: @host, port: @port, @@ -66,99 +77,101 @@ opts[:cert_file] = @security.tls.cert_file opts[:key_file] = @security.tls.key_file end init_retry_interval + @retry_sequence = [] @client = MQTT::Client.new(opts) connect end def shutdown_proxy - @client.disconnect + disconnect end def init_retry_interval @retry_interval = @initial_interval end def increment_retry_interval - return @retry_interval if @retry_interval >= @max_retry_interval + return @max_retry_interval if @retry_interval >= @max_retry_interval @retry_interval = @retry_interval * @retry_inc_ratio end - def retry_connect(e, message) - if !@_retrying - log.error "#{message},#{e.class},#{e.message}" - log.error "Retry in #{@retry_interval} sec" - timer_execute("#{current_plugin_name}_connect".to_sym, @retry_interval, repeat: false, &method(:connect)) - @_retrying = true - increment_retry_interval - after_disconnection - @client.disconnect if @client.connected? + def update_retry_sequence(e) + @retry_sequence << {time: Time.now, error: "#{e.class}: #{e.message}"} + # delete old retry records + while @retry_sequence[0][:time] < Time.now - 60 + @retry_sequence.shift + end + end + + def check_retry_frequency + return if @retry_sequence.size <= 1 + if @retry_sequence.size > @max_retry_freq + log.error "Retry frequency threshold is exceeded: #{@retry_sequence}" + raise ExceedRetryFrequencyThresholdException end end - def kill_connect_thread - @connect_thread.kill if !@connect_thread.nil? + def retry_connect(e, message) + log.error "#{message},#{e.class},#{e.message}" + log.error "Retry in #{@retry_interval} sec" + update_retry_sequence(e) + check_retry_frequency + disconnect + sleep @retry_interval + increment_retry_interval + connect + # never reach here end - def after_disconnection + def disconnect # should be implemented - kill_connect_thread end + def terminate + end + def rescue_disconnection # Errors cannot be caught by fluentd core must be caught here. # Since fluentd core retries write method for buffered output # when it caught Errors during the previous write, # caughtable Error, e.g. MqttError, should be raised here. begin yield rescue MQTT::ProtocolException => e - # TODO: - # Thread created via fluentd thread API, e.g. thread_create, - # cannot catch MQTT::ProtocolException raised from @read_thread - # in ruby-mqtt. So, the current version uses plugin local thread - # @connect_thread to catch it. retry_connect(e, "Protocol error occurs in #{current_plugin_name}.") - raise MqttError, "Protocol error occurs." rescue Timeout::Error => e retry_connect(e, "Timeout error occurs in #{current_plugin_name}.") - raise Timeout::Error, "Timeout error occurs." rescue SystemCallError => e retry_connect(e, "System call error occurs in #{current_plugin_name}.") - raise SystemCallError, "System call error occurs." rescue StandardError=> e retry_connect(e, "The other error occurs in #{current_plugin_name}.") - raise StandardError, "The other error occurs." rescue MQTT::NotConnectedException=> e # Since MQTT::NotConnectedException is raised only on publish, # connection error should be catched before this error. # So, reconnection process is omitted for this Exception # to prevent waistful increment of retry interval. #log.error "MQTT not connected exception occurs.,#{e.class},#{e.message}" #retry_connect(e, "MQTT not connected exception occurs.") - raise MqttError, "MQTT not connected exception occurs in #{current_plugin_name}." + #raise MqttError, "MQTT not connected exception occurs in #{current_plugin_name}." end end def after_connection # should be implemented # returns thread instance for monitor thread to wait # for Exception raised by MQTT I/O end def connect - #@connect_thread = thread_create("#{current_plugin_name}_monitor".to_sym) do - @_retrying = false - @connect_thread = Thread.new do - rescue_disconnection do - @client.connect - log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}" - init_retry_interval - thread = after_connection - thread.join - end + rescue_disconnection do + @client.connect + log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}" + init_retry_interval + thread = after_connection + thread.join end end end end