lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.0.6 vs lib/fluent/plugin/mqtt_output_mixin.rb in fluent-plugin-mqtt-io-0.1.0
- old
+ new
@@ -10,14 +10,15 @@
base.config_param :password, :string, :default => nil
base.config_param :ssl, :bool, :default => nil
base.config_param :ca_file, :string, :default => nil
base.config_param :key_file, :string, :default => nil
base.config_param :cert_file, :string, :default => nil
- base.config_param :time_key, :string, :default => nil
+ base.config_param :time_key, :string, :default => 'time'
base.config_param :time_format, :string, :default => nil
base.config_param :topic_rewrite_pattern, :string, :default => nil
base.config_param :topic_rewrite_replacement, :string, :default => nil
+ base.config_param :bulk_trans_sep, :string, :default => "\t"
end
require 'mqtt'
# This method is called before starting.
@@ -33,10 +34,11 @@
@password ||= conf['password']
@time_key ||= conf['time_key']
@time_format ||= conf['time_format']
@topic_rewrite_pattern ||= conf['topic_rewrite_pattern']
@topic_rewrite_replacement ||= conf['topic_rewrite_replacement']
+ @bulk_trans_sep ||= conf['bulk_trans_sep']
end
# This method is called when starting.
# Open sockets or files here.
def start
@@ -51,24 +53,48 @@
}
opts[:ssl] = @ssl if @ssl
opts[:ca_file] = @ca_file if @ca_file
opts[:cert_file] = @cert_file if @cert_file
opts[:key_file] = @key_file if @key_file
- @connect = MQTT::Client.connect(opts)
+ # In order to handle Exception raised from reading Thread
+ # in MQTT::Client caused by network disconnection (during read_byte),
+ # @connect_thread generates connection.
+ @client = MQTT::Client.new(opts)
+ @connect_thread = Thread.new do
+ while (true)
+ begin
+ @client.disconnect if @client.connected?
+ @client.connect
+ sleep
+ rescue MQTT::ProtocolException => pe
+ $log.debug "Handling #{pe.class}: #{pe.message}"
+ next
+ rescue Timeout::Error => te
+ $log.debug "Handling #{te.class}: #{te.message}"
+ next
+ end
+ end
+ end
end
# This method is called when shutting down.
# Shutdown the thread and close sockets or files here.
def shutdown
super
- @connect.disconnect
+ @client.disconnect
end
def format_time(time)
- if @time_format.nil?
+ case @time_format
+ when nil then
+ # default format is integer value
+ time
+ when "iso8601" then
+ # iso8601 format
Time.at(time).iso8601
else
+ # specified strftime format
Time.at(time).strftime(@time_format)
end
end
def timestamp_hash(time)