lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.4.0 vs lib/fluent/plugin/mqtt_proxy.rb in fluent-plugin-mqtt-io-0.4.1
- old
+ new
@@ -90,17 +90,22 @@
increment_retry_interval
after_disconnection
@client.disconnect if @client.connected?
end
+ def kill_connect_thread
+ @connect_thread.kill if !@connect_thread.nil?
+ end
+
def after_disconnection
# should be implemented
+ kill_connect_thread
end
- def rescue_disconnection(*block)
+ def rescue_disconnection
begin
- yield *block
+ yield
rescue MQTT::ProtocolException => e
# TODO:
# Currently MQTT::ProtocolException cannot be caught during @client.get
# and @client.publish. The reason must be investigated...
retry_connect(e, "Protocol error occurs.")
@@ -108,20 +113,27 @@
retry_connect(e, "Timeout error occurs.")
rescue SystemCallError => e
retry_connect(e, "System call error occurs.")
rescue StandardError=> e
retry_connect(e, "The other error occurs.")
+ rescue MQTT::NotConnectedException=> e
+ # Since MQTT::NotConnectedException is raised only on publish,
+ # connection error should be catched before this error.
+ # So, reconnection process is omitted for this Exception
+ # to prevent waistful increment of retry interval.
+ log.error "MQTT not connected exception occurs.,#{e.class},#{e.message}"
end
end
def after_connection
# should be implemented
# returns thread instance for monitor thread to wait
# for Exception raised by MQTT I/O
end
def connect
- thread_create("#{current_plugin_name}_monitor".to_sym) do
+ #@connect_thread = thread_create("#{current_plugin_name}_monitor".to_sym) do
+ @connect_thread = Thread.new do
rescue_disconnection do
@client.connect
log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}"
init_retry_interval
thread = after_connection