lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.0.6 vs lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.1.0

- old
+ new

@@ -10,14 +10,15 @@ base.config_param :password, :string, :default => nil base.config_param :ssl, :bool, :default => nil base.config_param :ca_file, :string, :default => nil base.config_param :key_file, :string, :default => nil base.config_param :cert_file, :string, :default => nil - base.config_param :time_key, :string, :default => nil + base.config_param :time_key, :string, :default => 'time' base.config_param :time_format, :string, :default => nil base.config_param :topic_rewrite_pattern, :string, :default => nil base.config_param :topic_rewrite_replacement, :string, :default => nil + base.config_param :bulk_trans_sep, :string, :default => "\t" end require 'mqtt' # This method is called before starting. @@ -33,10 +34,11 @@ @password ||= conf['password'] @time_key ||= conf['time_key'] @time_format ||= conf['time_format'] @topic_rewrite_pattern ||= conf['topic_rewrite_pattern'] @topic_rewrite_replacement ||= conf['topic_rewrite_replacement'] + @bulk_trans_sep ||= conf['bulk_trans_sep'] end # This method is called when starting. # Open sockets or files here. def start @@ -51,24 +53,48 @@ } opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca_file if @ca_file opts[:cert_file] = @cert_file if @cert_file opts[:key_file] = @key_file if @key_file - @connect = MQTT::Client.connect(opts) + # In order to handle Exception raised from reading Thread + # in MQTT::Client caused by network disconnection (during read_byte), + # @connect_thread generates connection. + @client = MQTT::Client.new(opts) + @connect_thread = Thread.new do + while (true) + begin + @client.disconnect if @client.connected? + @client.connect + sleep + rescue MQTT::ProtocolException => pe + $log.debug "Handling #{pe.class}: #{pe.message}" + next + rescue Timeout::Error => te + $log.debug "Handling #{te.class}: #{te.message}" + next + end + end + end end # This method is called when shutting down. # Shutdown the thread and close sockets or files here. def shutdown super - @connect.disconnect + @client.disconnect end def format_time(time) - if @time_format.nil? + case @time_format + when nil then + # default format is integer value + time + when "iso8601" then + # iso8601 format Time.at(time).iso8601 else + # specified strftime format Time.at(time).strftime(@time_format) end end def timestamp_hash(time)