lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.4.0 vs lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.4.1

- old
+ new

@@ -90,17 +90,22 @@ increment_retry_interval after_disconnection @client.disconnect if @client.connected? end + def kill_connect_thread + @connect_thread.kill if !@connect_thread.nil? + end + def after_disconnection # should be implemented + kill_connect_thread end - def rescue_disconnection(*block) + def rescue_disconnection begin - yield *block + yield rescue MQTT::ProtocolException => e # TODO: # Currently MQTT::ProtocolException cannot be caught during @client.get # and @client.publish. The reason must be investigated... retry_connect(e, "Protocol error occurs.") @@ -108,20 +113,27 @@ retry_connect(e, "Timeout error occurs.") rescue SystemCallError => e retry_connect(e, "System call error occurs.") rescue StandardError=> e retry_connect(e, "The other error occurs.") + rescue MQTT::NotConnectedException=> e + # Since MQTT::NotConnectedException is raised only on publish, + # connection error should be catched before this error. + # So, reconnection process is omitted for this Exception + # to prevent waistful increment of retry interval. + log.error "MQTT not connected exception occurs.,#{e.class},#{e.message}" end end def after_connection # should be implemented # returns thread instance for monitor thread to wait # for Exception raised by MQTT I/O end def connect - thread_create("#{current_plugin_name}_monitor".to_sym) do + #@connect_thread = thread_create("#{current_plugin_name}_monitor".to_sym) do + @connect_thread = Thread.new do rescue_disconnection do @client.connect log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}" init_retry_interval thread = after_connection